You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/16 18:05:37 UTC

[1/3] incubator-beam git commit: Add test for ReduceFnRunner GC time overflow

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0a3425e77 -> 455aaed55


Add test for ReduceFnRunner GC time overflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/925264e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/925264e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/925264e3

Branch: refs/heads/master
Commit: 925264e3eb244172e57c134cb3bf89db3d98258c
Parents: f8c59bd
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:04:10 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:22 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/WindowMatchers.java     |  5 ++
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 68 ++++++++++++++++++++
 2 files changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/925264e3/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index b47c32c..7a5e2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -47,6 +47,11 @@ public class WindowMatchers {
     return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
   }
 
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+  }
+
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       T value, long timestamp, long windowStart, long windowEnd) {
     return WindowMatchers.<T>isSingleWindowedValue(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/925264e3/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 0df4bc6..b7ec540 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -79,6 +82,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -226,6 +230,70 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
   }
 
+  /**
+   * Tests that the garbage collection time for a fixed window does not overflow the end of time.
+   */
+  @Test
+  public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
+
+    Duration allowedLateness = Duration.standardDays(365);
+    Duration windowSize = Duration.millis(10);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
+
+    // This timestamp falls into a window where the end of the window is before the end of the
+    // global window - the "end of time" - yet its expiration time is after.
+    final Instant elementTimestamp =
+        GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
+
+    IntervalWindow window = Iterables.getOnlyElement(
+        windowFn.assignWindows(
+            windowFn.new AssignContext() {
+              @Override
+              public Object element() {
+                throw new UnsupportedOperationException();
+              }
+              @Override
+              public Instant timestamp() {
+                return elementTimestamp;
+              }
+
+              @Override
+              public Collection<? extends BoundedWindow> windows() {
+                throw new UnsupportedOperationException();
+              }
+            }));
+
+    assertTrue(
+        window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertTrue(
+        window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
+
+    // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            windowFn,
+            AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(),
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(),
+            allowedLateness);
+
+    tester.injectElements(TimestampedValue.of(13, elementTimestamp));
+
+    // Should fire ON_TIME pane and there will be a checkState that the cleanup time
+    // is prior to timestamp max value
+    tester.advanceInputWatermark(window.maxTimestamp());
+
+    // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
+    assertThat(tester.extractOutput(), emptyIterable());
+
+    tester.injectElements(TimestampedValue.of(42, elementTimestamp));
+
+    // Now the final pane should fire, demonstrating that the GC time was truncated
+    tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
+    assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
+  }
+
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.


[3/3] incubator-beam git commit: This closes #464

Posted by ke...@apache.org.
This closes #464


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/455aaed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/455aaed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/455aaed5

Branch: refs/heads/master
Commit: 455aaed5557da4a07dd86d94a1a4fe709455d6aa
Parents: 0a3425e dc60dc7
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 16 11:04:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:57 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ReduceFnRunner.java    | 19 ++++--
 .../org/apache/beam/sdk/WindowMatchers.java     |  5 ++
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 68 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Fix overflow in ReduceFnRunner garbage collection times

Posted by ke...@apache.org.
Fix overflow in ReduceFnRunner garbage collection times


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc60dc7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc60dc7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc60dc7c

Branch: refs/heads/master
Commit: dc60dc7ce22f95e12c99bb65f258931f330444c9
Parents: 925264e
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:10:09 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 16 11:04:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/util/ReduceFnRunner.java | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc60dc7c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 34208da..864e8e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -936,16 +936,21 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
   }
 
   /**
-   * Return when {@code window} should be garbage collected. If the window is the GlobalWindow,
-   * that will be the end of the window. Otherwise, add the allowed lateness to the end of
-   * the window.
+   * Return when {@code window} should be garbage collected. If the window's expiration time is on
+   * or after the end of the global window, it will be truncated to the end of the global window.
    */
   private Instant garbageCollectionTime(W window) {
-    Instant maxTimestamp = window.maxTimestamp();
-    if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) {
-      return maxTimestamp.plus(windowingStrategy.getAllowedLateness());
+
+    // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it is because the
+    // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(windowingStrategy.getAllowedLateness())
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
     } else {
-      return maxTimestamp;
+      return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     }
   }