You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/13 23:32:48 UTC

[2/5] beam git commit: Truncate the very last fixed window if it goes beyond representable time

Truncate the very last fixed window if it goes beyond representable time


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/296cba00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/296cba00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/296cba00

Branch: refs/heads/master
Commit: 296cba009a5c979223fb61bd411816169eaad515
Parents: 555ba40
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:45 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/LateDataUtilsTest.java    |  2 +-
 .../sdk/transforms/windowing/FixedWindows.java  | 24 +++++++++++++++++---
 .../transforms/windowing/FixedWindowsTest.java  | 12 ++++++++++
 3 files changed, 34 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
index f0f315d..cef865c 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -64,7 +64,7 @@ public class LateDataUtilsTest {
     IntervalWindow window = windowFn.assignWindow(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(
         window.maxTimestamp(),
-        Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
     assertThat(
         LateDataUtils.garbageCollectionTime(window, strategy),
         equalTo(GlobalWindow.INSTANCE.maxTimestamp()));

http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8b16916..6c9376c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -76,9 +76,27 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> {
 
   @Override
   public IntervalWindow assignWindow(Instant timestamp) {
-    long start = timestamp.getMillis()
-        - timestamp.plus(size).minus(offset).getMillis() % size.getMillis();
-    return new IntervalWindow(new Instant(start), size);
+    Instant start =
+        new Instant(
+            timestamp.getMillis()
+                - timestamp.plus(size).minus(offset).getMillis() % size.getMillis());
+
+
+    // The global window is inclusive of max timestamp, while interval window excludes its
+    // upper bound
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp().plus(1);
+
+    // The end of the window is either start + size if that is within the allowable range, otherwise
+    // the end of the global window. Truncating the window drives many other
+    // areas of this system in the appropriate way automatically.
+    //
+    // Though it is curious that the very last representable fixed window is shorter than the rest,
+    // when we are processing data in the year 294247, we'll probably have technology that can
+    // account for this.
+    Instant end =
+        start.isAfter(endOfGlobalWindow.minus(size)) ? endOfGlobalWindow : start.plus(size);
+
+    return new IntervalWindow(start, end);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/296cba00/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 80a534c..8dc02f9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -107,6 +107,18 @@ public class FixedWindowsTest {
     assertThat(mapping.maximumLookback(), equalTo(Duration.ZERO));
   }
 
+  /** Tests that the last hour of the universe in fact ends at the end of time. */
+  @Test
+  public void testEndOfTime() {
+    Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
+    FixedWindows windowFn = FixedWindows.of(Duration.standardHours(1));
+
+    IntervalWindow truncatedWindow =
+        windowFn.assignWindow(endOfGlobalWindow.minus(1));
+
+    assertThat(truncatedWindow.maxTimestamp(), equalTo(endOfGlobalWindow));
+  }
+
   @Test
   public void testDefaultWindowMappingFnGlobalWindow() {
     PartitioningWindowFn<?, ?> windowFn = FixedWindows.of(Duration.standardMinutes(20L));