You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/13 23:32:48 UTC
[2/5] beam git commit: Truncate the very last fixed window if it goes
beyond representable time
Truncate the very last fixed window if it goes beyond representable time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/296cba00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/296cba00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/296cba00
Branch: refs/heads/master
Commit: 296cba009a5c979223fb61bd411816169eaad515
Parents: 555ba40
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:45 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../beam/runners/core/LateDataUtilsTest.java | 2 +-
.../sdk/transforms/windowing/FixedWindows.java | 24 +++++++++++++++++---
.../transforms/windowing/FixedWindowsTest.java | 12 ++++++++++
3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
index f0f315d..cef865c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -64,7 +64,7 @@ public class LateDataUtilsTest {
IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
assertThat(
window.maxTimestamp(),
- Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+ equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
assertThat(
LateDataUtils.garbageCollectionTime(window, strategy),
equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8b16916..6c9376c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -76,9 +76,27 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> {
@Override
public IntervalWindow assignWindow(Instant timestamp) {
- long start = timestamp.getMillis()
- - timestamp.plus(size).minus(offset).getMillis() % size.getMillis();
- return new IntervalWindow(new Instant(start), size);
+ Instant start =
+ new Instant(
+ timestamp.getMillis()
+ - timestamp.plus(size).minus(offset).getMillis() % size.getMillis());
+
+
+ // The global window is inclusive of max timestamp, while interval window excludes its
+ // upper bound
+ Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);
+
+ // The end of the window is either start + size if that is within the allowable range, otherwise
+ // the end of the global window. Truncating the window drives many other
+ // areas of this system in the appropriate way automatically.
+ //
+ // Though it is curious that the very last representable fixed window is shorter than the rest,
+ // when we are processing data in the year 294247, we'll probably have technology that can
+ // account for this.
+ Instant end =
+ start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);
+
+ return new IntervalWindow(start, end);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 80a534c..8dc02f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -107,6 +107,18 @@ public class FixedWindowsTest {
assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
}
+ /** Tests that the last hour of the universe in fact ends at the end of time. */
+ @Test
+ public void testEndOfTime() {
+ Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+ FixedWindows windowFn = FixedWindows.of(Duration.standardHours(1));
+
+ IntervalWindow truncatedWindow =
+ windowFn.assignWindow(endOfGlobalWindow.minus(1));
+
+ assertThat(truncatedWindow.maxTimestamp(), equalTo(endOfGlobalWindow));
+ }
+
@Test
public void testDefaultWindowMappingFnGlobalWindow() {
PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));