You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:36 UTC
[24/50] [abbrv] beam git commit: Ignore processing time timers in
expired windows
Ignore processing time timers in expired windows
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/951f3cab
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/951f3cab
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/951f3cab
Branch: refs/heads/DSL_SQL
Commit: 951f3cab3f6558524ee1146e0e3f347bcd02ecda
Parents: c167d10
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:09:11 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/ReduceFnRunner.java | 10 ++++++
.../beam/runners/core/ReduceFnRunnerTest.java | 32 ++++++++++++++++++++
2 files changed, 42 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index ef33bef..0632c05 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -693,6 +693,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();
+
+ if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+ continue;
+ }
+
ReduceFn<K, InputT, OutputT, W>.Context directContext =
contextFactory.base(window, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
@@ -1090,4 +1095,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
}
+ private boolean windowIsExpired(BoundedWindow w) {
+ return timerInternals
+ .currentInputWatermarkTime()
+ .isAfter(w.maxTimestamp().plus(windowingStrategy.getAllowedLateness()));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/951f3cab/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3a2c220..79ee91b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -286,6 +286,38 @@ public class ReduceFnRunnerTest {
/**
* Tests that when a processing time timer comes in after a window is expired
+ * it is just ignored.
+ */
+ @Test
+ public void testLateProcessingTimeTimer() throws Exception {
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.ZERO)
+ .withTrigger(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10))));
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+ tester.advanceProcessingTime(new Instant(5000));
+ injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100
+ injectElement(tester, 5);
+
+ // After this advancement, the window is expired and only the GC process
+ // should be allowed to touch it
+ tester.advanceInputWatermarkNoTimers(new Instant(100));
+
+ // This should not output
+ tester.advanceProcessingTime(new Instant(6000));
+
+ assertThat(tester.extractOutput(), emptyIterable());
+ }
+
+ /**
+ * Tests that when a processing time timer comes in after a window is expired
* but in the same bundle it does not cause a spurious output.
*/
@Test