You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/03 20:21:43 UTC

kafka git commit: KAFKA-3784: TimeWindows#windowsFor calculation is incorrect

Repository: kafka
Updated Branches:
  refs/heads/trunk a27030693 -> 234fa5a69


KAFKA-3784: TimeWindows#windowsFor calculation is incorrect

- Fixed the logic calculating the windows that are affected by a new \u2026event in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue

Author: Tom Rybak <tr...@gmail.com>

Reviewers: Michael G. Noll, Matthias J. Sax, Guozhang Wang

Closes #1462 from trybak/bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/234fa5a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/234fa5a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/234fa5a6

Branch: refs/heads/trunk
Commit: 234fa5a6949c9a5bfb4f543989c2ece84fcce033
Parents: a270306
Author: Tom Rybak <tr...@gmail.com>
Authored: Fri Jun 3 13:21:40 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 3 13:21:40 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/kstream/TimeWindows.java   | 4 +---
 .../org/apache/kafka/streams/kstream/TimeWindowsTest.java    | 8 ++++++++
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/234fa5a6/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index e4ce883..001e92e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -99,9 +99,7 @@ public class TimeWindows extends Windows<TimeWindow> {
 
     @Override
     public Map<Long, TimeWindow> windowsFor(long timestamp) {
-        long enclosed = (size - 1) / advance;
-        long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);
-
+        long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance) * this.advance;
         Map<Long, TimeWindow> windows = new HashMap<>();
         while (windowStart <= timestamp) {
             TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);

http://git-wip-us.apache.org/repos/asf/kafka/blob/234fa5a6/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index e9ff235..62b12a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -113,6 +113,14 @@ public class TimeWindowsTest {
     }
 
     @Test
+    public void windowsForBarelyOverlappingHoppingWindows() {
+        TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L);
+        Map<Long, TimeWindow> matched = windows.windowsFor(7L);
+        assertEquals(1, matched.size());
+        assertEquals(new TimeWindow(5L, 11L), matched.get(5L));
+    }
+
+    @Test
     public void windowsForTumblingWindows() {
         TimeWindows windows = TimeWindows.of(anyName, 12L);
         Map<Long, TimeWindow> matched = windows.windowsFor(21L);