You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/19 03:44:16 UTC

[kafka] branch trunk updated: KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02a8ef8  KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)
02a8ef8 is described below

commit 02a8ef859539349151e9d1dd0c69ac83a24ee7a2
Author: asutosh936 <as...@hotmail.com>
AuthorDate: Sun Mar 18 22:44:11 2018 -0500

    KAFKA-6486: Implemented LinkedHashMap in TimeWindows (#4628)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/kstream/TimeWindows.java     |  4 ++--
 .../kafka/streams/kstream/internals/TimeWindowTest.java   | 15 +++++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index f9090c5..c2b910d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -105,7 +105,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     @Override
     public Map<Long, TimeWindow> windowsFor(final long timestamp) {
         long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs;
-        final Map<Long, TimeWindow> windows = new HashMap<>();
+        final Map<Long, TimeWindow> windows = new LinkedHashMap<>();
         while (windowStart <= timestamp) {
             final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs);
             windows.put(windowStart, window);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index 09ac173..f260bee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.junit.Test;
 
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -117,4 +121,15 @@ public class TimeWindowTest {
     public void cannotCompareTimeWindowWithDifferentWindowType() {
         window.overlap(sessionWindow);
     }
+
+    @Test
+    public void shouldReturnMatchedWindowsOrderedByTimestamp() {
+        final TimeWindows windows = TimeWindows.of(12L).advanceBy(5L);
+        final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+
+        final Long[] expected = matched.keySet().toArray(new Long[matched.size()]);
+        assertEquals(expected[0].longValue(), 10L);
+        assertEquals(expected[1].longValue(), 15L);
+        assertEquals(expected[2].longValue(), 20L);
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.