You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/11 00:08:58 UTC

[2/6] incubator-beam git commit: Add unit test.

Add unit test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045471c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045471c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045471c1

Branch: refs/heads/master
Commit: 045471c1dc7ffeecc8ea8b6c0695498261aa631b
Parents: c415be8
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 16:50:16 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 16:50:16 2016 -0800

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java |  5 +--
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   | 38 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2b6e0d4..2e2d1f6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -602,7 +602,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       boolean windowIsActive)
           throws Exception {
     if (windowIsActive) {
-      // Since window was still active the trigger may not have closed.
+      // Since both the window is in the active window set AND the trigger was not yet closed,
+      // it is possible we still have state.
       reduceFn.clearState(renamedContext);
       watermarkHold.clearHolds(renamedContext);
       nonEmptyPanes.clearPane(renamedContext.state());
@@ -623,7 +624,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     }
     paneInfoTracker.clear(directContext.state());
     if (activeWindows.isActive(directContext.window())) {
-      // Don't need to track address state windows anymore
+      // Don't need to track address state windows anymore.
       activeWindows.remove(directContext.window());
     }
     // We'll never need to test for the trigger being closed again.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index 4fb3e37..e1348f7 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -687,6 +687,44 @@ public class ReduceFnRunnerTest {
   }
 
   /**
+   * It is possible for a session window's trigger to be closed at the point at which
+   * the (merged) session window is garbage collected. Make sure we don't accidentally
+   * assume the window is still active.
+   */
+  @Test
+  public void testMergingWithCloseBeforeGC() throws Exception {
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+            AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+            ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+    // Two elements in two overlapping session windows.
+    tester.injectElements(
+        TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+        TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+    // Close the trigger, but the gargbage collection timer is still pending.
+    when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+    triggerShouldFinish(mockTrigger);
+    tester.advanceInputWatermark(new Instant(30));
+
+    // Now the garbage collection timer will fire, finding the trigger already closed.
+    tester.advanceInputWatermark(new Instant(100));
+
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    assertThat(output.size(), equalTo(1));
+    assertThat(output.get(0),
+        isSingleWindowedValue(containsInAnyOrder(1, 10),
+            1, // timestamp
+            1, // window start
+            20)); // window end
+    assertThat(
+        output.get(0).getPane(),
+        equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+  }
+
+
+  /**
    * Tests that when data is assigned to multiple windows but some of those windows have
    * had their triggers finish, then the data is dropped and counted accurately.
    */