You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:02 UTC

[09/50] [abbrv] incubator-beam git commit: Add test for ReduceFnRunner GC time overflow

Add test for ReduceFnRunner GC time overflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf732cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf732cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf732cd

Branch: refs/heads/python-sdk
Commit: 5bf732cd3e598321a5c51e1239eda0fe2877a65d
Parents: 6058330
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 14 16:04:10 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/WindowMatchers.java     |  5 ++
 .../beam/sdk/util/ReduceFnRunnerTest.java       | 68 ++++++++++++++++++++
 2 files changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
index b47c32c..7a5e2fb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
@@ -47,6 +47,11 @@ public class WindowMatchers {
     return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything());
   }
 
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher) {
+    return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything());
+  }
+
   public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
       T value, long timestamp, long windowStart, long windowEnd) {
     return WindowMatchers.<T>isSingleWindowedValue(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index 0df4bc6..b7ec540 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -51,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -79,6 +82,7 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -226,6 +230,70 @@ public class ReduceFnRunnerTest {
     tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
   }
 
+  /**
+   * Tests that the garbage collection time for a fixed window does not overflow the end of time.
+   */
+  @Test
+  public void testFixedWindowEndOfTimeGarbageCollection() throws Exception {
+
+    Duration allowedLateness = Duration.standardDays(365);
+    Duration windowSize = Duration.millis(10);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize);
+
+    // This timestamp falls into a window where the end of the window is before the end of the
+    // global window - the "end of time" - yet its expiration time is after.
+    final Instant elementTimestamp =
+        GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1);
+
+    IntervalWindow window = Iterables.getOnlyElement(
+        windowFn.assignWindows(
+            windowFn.new AssignContext() {
+              @Override
+              public Object element() {
+                throw new UnsupportedOperationException();
+              }
+              @Override
+              public Instant timestamp() {
+                return elementTimestamp;
+              }
+
+              @Override
+              public Collection<? extends BoundedWindow> windows() {
+                throw new UnsupportedOperationException();
+              }
+            }));
+
+    assertTrue(
+        window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertTrue(
+        window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp()));
+
+    // Test basic execution of a trigger using a non-combining window set and accumulating mode.
+    ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+        ReduceFnTester.combining(
+            windowFn,
+            AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(),
+            AccumulationMode.DISCARDING_FIRED_PANES,
+            new Sum.SumIntegerFn().<String>asKeyedFn(),
+            VarIntCoder.of(),
+            allowedLateness);
+
+    tester.injectElements(TimestampedValue.of(13, elementTimestamp));
+
+    // Should fire ON_TIME pane and there will be a checkState that the cleanup time
+    // is prior to timestamp max value
+    tester.advanceInputWatermark(window.maxTimestamp());
+
+    // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner)
+    assertThat(tester.extractOutput(), emptyIterable());
+
+    tester.injectElements(TimestampedValue.of(42, elementTimestamp));
+
+    // Now the final pane should fire, demonstrating that the GC time was truncated
+    tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp());
+    assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55))));
+  }
+
   @Test
   public void testOnElementCombiningAccumulating() throws Exception {
     // Test basic execution of a trigger using a non-combining window set and accumulating mode.