You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/11 00:08:58 UTC
[2/6] incubator-beam git commit: Add unit test.
Add unit test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045471c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045471c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045471c1
Branch: refs/heads/master
Commit: 045471c1dc7ffeecc8ea8b6c0695498261aa631b
Parents: c415be8
Author: Mark Shields <ma...@google.com>
Authored: Wed Mar 9 16:50:16 2016 -0800
Committer: Mark Shields <ma...@google.com>
Committed: Wed Mar 9 16:50:16 2016 -0800
----------------------------------------------------------------------
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 5 +--
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 38 ++++++++++++++++++++
2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2b6e0d4..2e2d1f6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -602,7 +602,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
boolean windowIsActive)
throws Exception {
if (windowIsActive) {
- // Since window was still active the trigger may not have closed.
+ // Since both the window is in the active window set AND the trigger was not yet closed,
+ // it is possible we still have state.
reduceFn.clearState(renamedContext);
watermarkHold.clearHolds(renamedContext);
nonEmptyPanes.clearPane(renamedContext.state());
@@ -623,7 +624,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
paneInfoTracker.clear(directContext.state());
if (activeWindows.isActive(directContext.window())) {
- // Don't need to track address state windows anymore
+ // Don't need to track address state windows anymore.
activeWindows.remove(directContext.window());
}
// We'll never need to test for the trigger being closed again.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045471c1/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index 4fb3e37..e1348f7 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -687,6 +687,44 @@ public class ReduceFnRunnerTest {
}
/**
+ * It is possible for a session window's trigger to be closed at the point at which
+ * the (merged) session window is garbage collected. Make sure we don't accidentally
+ * assume the window is still active.
+ */
+ @Test
+ public void testMergingWithCloseBeforeGC() throws Exception {
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger,
+ AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50),
+ ClosingBehavior.FIRE_IF_NON_EMPTY);
+
+ // Two elements in two overlapping session windows.
+ tester.injectElements(
+ TimestampedValue.of(1, new Instant(1)), // in [1, 11)
+ TimestampedValue.of(10, new Instant(10))); // in [10, 20)
+
+ // Close the trigger, but the gargbage collection timer is still pending.
+ when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
+ triggerShouldFinish(mockTrigger);
+ tester.advanceInputWatermark(new Instant(30));
+
+ // Now the garbage collection timer will fire, finding the trigger already closed.
+ tester.advanceInputWatermark(new Instant(100));
+
+ List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+ assertThat(output.size(), equalTo(1));
+ assertThat(output.get(0),
+ isSingleWindowedValue(containsInAnyOrder(1, 10),
+ 1, // timestamp
+ 1, // window start
+ 20)); // window end
+ assertThat(
+ output.get(0).getPane(),
+ equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)));
+ }
+
+
+ /**
* Tests that when data is assigned to multiple windows but some of those windows have
* had their triggers finish, then the data is dropped and counted accurately.
*/