You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/10 04:50:01 UTC
[31/43] beam git commit: Process timer firings for a window together
Process timer firings for a window together
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/935c0773
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/935c0773
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/935c0773
Branch: refs/heads/gearpump-runner
Commit: 935c077341de580dddd4b29ffee3926795acf403
Parents: bd631b8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 22 18:43:39 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jul 6 14:12:39 2017 -0700
----------------------------------------------------------------------
.../examples/complete/game/LeaderBoardTest.java | 2 +
.../beam/runners/core/ReduceFnRunner.java | 98 +++++++++++++-------
.../beam/runners/core/ReduceFnRunnerTest.java | 49 +++++++++-
3 files changed, 115 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
index 745c210..611e2b3 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java
@@ -276,6 +276,8 @@ public class LeaderBoardTest implements Serializable {
.addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)),
event(TestUser.BLUE_TWO, 3, Duration.ZERO),
event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3)))
+ // Move the watermark to the end of the window to output on time
+ .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION))
// Move the watermark past the end of the allowed lateness plus the end of the window
.advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS)
.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1)))
http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0632c05..634a2d1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -29,7 +29,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -638,11 +637,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
/**
- * Enriches TimerData with state necessary for processing a timer as well as
- * common queries about a timer.
+ * A descriptor of the activation for a window based on a timer.
*/
- private class EnrichedTimerData {
- public final Instant timestamp;
+ private class WindowActivation {
public final ReduceFn<K, InputT, OutputT, W>.Context directContext;
public final ReduceFn<K, InputT, OutputT, W>.Context renamedContext;
// If this is an end-of-window timer then we may need to set a garbage collection timer
@@ -653,19 +650,34 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// end-of-window time to be a signal to garbage collect.
public final boolean isGarbageCollection;
- EnrichedTimerData(
- TimerData timer,
+ WindowActivation(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
- this.timestamp = timer.getTimestamp();
this.directContext = directContext;
this.renamedContext = renamedContext;
W window = directContext.window();
- this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
- Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, windowingStrategy);
+
+ // The output watermark is before the end of the window if it is either unknown
+ // or it is known to be before it. If it is unknown, that means that there hasn't been
+ // enough data to advance it.
+ boolean outputWatermarkBeforeEOW =
+ timerInternals.currentOutputWatermarkTime() == null
+ || !timerInternals.currentOutputWatermarkTime().isAfter(window.maxTimestamp());
+
+ // The "end of the window" is reached when the local input watermark (for this key) surpasses
+ // it but the local output watermark (also for this key) has not. After data is emitted and
+ // the output watermark hold is released, the output watermark on this key will immediately
+ // exceed the end of the window (otherwise we could see multiple ON_TIME outputs)
+ this.isEndOfWindow =
+ timerInternals.currentInputWatermarkTime().isAfter(window.maxTimestamp())
+ && outputWatermarkBeforeEOW;
+
+ // The "GC time" is reached when the input watermark surpasses the end of the window
+ // plus allowed lateness. After this, the window is expired and expunged.
this.isGarbageCollection =
- TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore(cleanupTime);
+ timerInternals
+ .currentInputWatermarkTime()
+ .isAfter(LateDataUtils.garbageCollectionTime(window, windowingStrategy));
}
// Has this window had its trigger finish?
@@ -684,9 +696,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
return;
}
- // Create a reusable context for each timer and begin prefetching necessary
+ // Create a reusable context for each window and begin prefetching necessary
// state.
- List<EnrichedTimerData> enrichedTimers = new LinkedList();
+ Map<BoundedWindow, WindowActivation> windowActivations = new HashMap();
+
for (TimerData timer : timers) {
checkArgument(timer.getNamespace() instanceof WindowNamespace,
"Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
@@ -694,7 +707,24 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
W window = windowNamespace.getWindow();
- if (TimeDomain.PROCESSING_TIME == timer.getDomain() && windowIsExpired(window)) {
+ WindowTracing.debug("{}: Received timer key:{}; window:{}; data:{} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ ReduceFnRunner.class.getSimpleName(),
+ key, window, timer,
+ timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+
+ // Processing time timers for an expired window are ignored, just like elements
+ // that show up too late. Window GC is management by an event time timer
+ if (TimeDomain.EVENT_TIME != timer.getDomain() && windowIsExpired(window)) {
+ continue;
+ }
+
+ // How a window is processed is a function only of the current state, not the details
+ // of the timer. This makes us robust to large leaps in processing time and watermark
+ // time, where both EOW and GC timers come in together and we need to GC and emit
+ // the final pane.
+ if (windowActivations.containsKey(window)) {
continue;
}
@@ -702,11 +732,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
contextFactory.base(window, StateStyle.DIRECT);
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(window, StateStyle.RENAMED);
- EnrichedTimerData enrichedTimer = new EnrichedTimerData(timer, directContext, renamedContext);
- enrichedTimers.add(enrichedTimer);
+ WindowActivation windowActivation = new WindowActivation(directContext, renamedContext);
+ windowActivations.put(window, windowActivation);
// Perform prefetching of state to determine if the trigger should fire.
- if (enrichedTimer.isGarbageCollection) {
+ if (windowActivation.isGarbageCollection) {
triggerRunner.prefetchIsClosed(directContext.state());
} else {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
@@ -714,7 +744,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
// For those windows that are active and open, prefetch the triggering or emitting state.
- for (EnrichedTimerData timer : enrichedTimers) {
+ for (WindowActivation timer : windowActivations.values()) {
if (timer.windowIsActiveAndOpen()) {
ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
if (timer.isGarbageCollection) {
@@ -727,25 +757,27 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
// Perform processing now that everything is prefetched.
- for (EnrichedTimerData timer : enrichedTimers) {
- ReduceFn<K, InputT, OutputT, W>.Context directContext = timer.directContext;
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext = timer.renamedContext;
+ for (WindowActivation windowActivation : windowActivations.values()) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext = windowActivation.directContext;
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext = windowActivation.renamedContext;
- if (timer.isGarbageCollection) {
- WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
- + "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), timer.timestamp,
+ if (windowActivation.isGarbageCollection) {
+ WindowTracing.debug(
+ "{}: Cleaning up for key:{}; window:{} with inputWatermark:{}; outputWatermark:{}",
+ ReduceFnRunner.class.getSimpleName(),
+ key,
+ directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- boolean windowIsActiveAndOpen = timer.windowIsActiveAndOpen();
+ boolean windowIsActiveAndOpen = windowActivation.windowIsActiveAndOpen();
if (windowIsActiveAndOpen) {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
@Nullable
Instant newHold = onTrigger(
- directContext, renamedContext, true /* isFinished */, timer.isEndOfWindow);
+ directContext, renamedContext, true /* isFinished */, windowActivation.isEndOfWindow);
checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
}
@@ -753,18 +785,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// see elements for it again.
clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
} else {
- WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+ WindowTracing.debug(
+ "{}.onTimers: Triggering for key:{}; window:{} at {} with "
+ "inputWatermark:{}; outputWatermark:{}",
- key, directContext.window(), timer.timestamp,
+ key,
+ directContext.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
- if (timer.windowIsActiveAndOpen()
+ if (windowActivation.windowIsActiveAndOpen()
&& triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
emit(directContext, renamedContext);
}
- if (timer.isEndOfWindow) {
+ if (windowActivation.isEndOfWindow) {
// If the window strategy trigger includes a watermark trigger then at this point
// there should be no data holds, either because we'd already cleared them on an
// earlier onTrigger, or because we just cleared them on the above emit.
http://git-wip-us.apache.org/repos/asf/beam/blob/935c0773/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 79ee91b..4f13af1 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -79,7 +80,6 @@ import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -246,6 +246,52 @@ public class ReduceFnRunnerTest {
tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
}
+ /**
+ * Tests that with the default trigger we will not produce two ON_TIME panes, even
+ * if there are two outputs that are both candidates.
+ */
+ @Test
+ public void testOnlyOneOnTimePane() throws Exception {
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10)))
+ .withTrigger(DefaultTrigger.of())
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withAllowedLateness(Duration.millis(100));
+
+ ReduceFnTester<Integer, Integer, IntervalWindow> tester =
+ ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of());
+
+ tester.advanceInputWatermark(new Instant(0));
+
+ int value1 = 1;
+ int value2 = 3;
+
+ // A single element that should be in the ON_TIME output
+ tester.injectElements(
+ TimestampedValue.of(value1, new Instant(1)));
+
+ // Should fire ON_TIME
+ tester.advanceInputWatermark(new Instant(10));
+
+ // The DefaultTrigger should cause output labeled LATE, even though it does not have to be
+ // labeled as such.
+ tester.injectElements(
+ TimestampedValue.of(value2, new Instant(3)));
+
+ List<WindowedValue<Integer>> output = tester.extractOutput();
+ assertEquals(2, output.size());
+
+ assertThat(output.get(0), WindowMatchers.isWindowedValue(equalTo(value1)));
+ assertThat(output.get(1), WindowMatchers.isWindowedValue(equalTo(value1 + value2)));
+
+ assertThat(
+ output.get(0),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)));
+ assertThat(
+ output.get(1),
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1)));
+ }
+
@Test
public void testOnElementCombiningDiscarding() throws Exception {
// Test basic execution of a trigger using a non-combining window set and discarding mode.
@@ -458,7 +504,6 @@ public class ReduceFnRunnerTest {
* marked as final.
*/
@Test
- @Ignore("https://issues.apache.org/jira/browse/BEAM-2505")
public void testCombiningAccumulatingEventTime() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100)))