You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/03 20:21:43 UTC
kafka git commit: KAFKA-3784: TimeWindows#windowsFor calculation is
incorrect
Repository: kafka
Updated Branches:
refs/heads/trunk a27030693 -> 234fa5a69
KAFKA-3784: TimeWindows#windowsFor calculation is incorrect
- Fixed the logic calculating the windows that are affected by a new \u2026event in the case of hopping windows and a small overlap.
- Added a unit test that tests for the issue
Author: Tom Rybak <tr...@gmail.com>
Reviewers: Michael G. Noll, Matthias J. Sax, Guozhang Wang
Closes #1462 from trybak/bugfix/KAFKA-3784-TimeWindows#windowsFor-false-positives
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/234fa5a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/234fa5a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/234fa5a6
Branch: refs/heads/trunk
Commit: 234fa5a6949c9a5bfb4f543989c2ece84fcce033
Parents: a270306
Author: Tom Rybak <tr...@gmail.com>
Authored: Fri Jun 3 13:21:40 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Jun 3 13:21:40 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/streams/kstream/TimeWindows.java | 4 +---
.../org/apache/kafka/streams/kstream/TimeWindowsTest.java | 8 ++++++++
2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/234fa5a6/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index e4ce883..001e92e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -99,9 +99,7 @@ public class TimeWindows extends Windows<TimeWindow> {
@Override
public Map<Long, TimeWindow> windowsFor(long timestamp) {
- long enclosed = (size - 1) / advance;
- long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);
-
+ long windowStart = (Math.max(0, timestamp - this.size + this.advance) / this.advance) * this.advance;
Map<Long, TimeWindow> windows = new HashMap<>();
while (windowStart <= timestamp) {
TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
http://git-wip-us.apache.org/repos/asf/kafka/blob/234fa5a6/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index e9ff235..62b12a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -113,6 +113,14 @@ public class TimeWindowsTest {
}
@Test
+ public void windowsForBarelyOverlappingHoppingWindows() {
+ TimeWindows windows = TimeWindows.of(anyName, 6L).advanceBy(5L);
+ Map<Long, TimeWindow> matched = windows.windowsFor(7L);
+ assertEquals(1, matched.size());
+ assertEquals(new TimeWindow(5L, 11L), matched.get(5L));
+ }
+
+ @Test
public void windowsForTumblingWindows() {
TimeWindows windows = TimeWindows.of(anyName, 12L);
Map<Long, TimeWindow> matched = windows.windowsFor(21L);