You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/11/23 10:02:17 UTC
[beam] branch release-2.9.0 updated: [BEAM-6116] [portable flink
streaming] Emit pushed back watermark when bundle is complete.
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.9.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.9.0 by this push:
new 2189179 [BEAM-6116] [portable flink streaming] Emit pushed back watermark when bundle is complete.
2189179 is described below
commit 2189179ca9ce187b79086500b4a7bfa6fdab0460
Author: Thomas Weise <th...@apache.org>
AuthorDate: Thu Nov 22 09:52:42 2018 -0800
[BEAM-6116] [portable flink streaming] Emit pushed back watermark when bundle is complete.
---
.../wrappers/streaming/ExecutableStageDoFnOperator.java | 13 ++++++++-----
.../flink/streaming/ExecutableStageDoFnOperatorTest.java | 15 ++++++++++++++-
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2071702..31ad20a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -43,7 +43,6 @@ import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -105,6 +104,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private transient StageBundleFactory stageBundleFactory;
private transient ExecutableStage executableStage;
private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+ private transient long backupWatermarkHold = Long.MIN_VALUE;
public ExecutableStageDoFnOperator(
String stepName,
@@ -353,7 +353,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
progressHandler,
outputManager,
outputMap,
- executableStage.getTimers(),
(Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(),
keySelector,
timerInternals);
@@ -394,11 +393,16 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
} else {
// It is not safe to advance the output watermark yet, so add a hold on the current
// output watermark.
- setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold()));
+ backupWatermarkHold = Math.max(backupWatermarkHold, getPushbackWatermarkHold());
+ setPushedBackWatermark(Math.min(currentOutputWatermark, backupWatermarkHold));
sdkHarnessRunner.setBundleFinishedCallback(
() -> {
try {
- processWatermark(mark);
+ LOG.debug("processing pushed back watermark: {}", mark);
+ // at this point the bundle is finished, allow the watermark to pass
+ // we are restoring the previous hold in case it was already set for side inputs
+ setPushedBackWatermark(backupWatermarkHold);
+ super.processWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
"Failed to process pushed back watermark after finished bundle.", e);
@@ -442,7 +446,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
BundleProgressHandler progressHandler,
BufferedOutputManager<OutputT> outputManager,
Map<String, TupleTag<?>> outputMap,
- Collection<TimerReference> timers,
Coder<BoundedWindow> windowCoder,
KeySelector<WindowedValue<InputT>, ?> keySelector,
TimerInternals timerInternals) {
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index cc282c5..c34dd08 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.streaming;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -273,13 +274,25 @@ public class ExecutableStageDoFnOperatorTest {
OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
+ long watermark = testHarness.getCurrentWatermark() + 1;
testHarness.open();
+
testHarness.processElement(new StreamRecord<>(zero));
+
+ testHarness.processWatermark(watermark);
+ watermark++;
+ testHarness.processWatermark(watermark);
+
+ assertEquals(watermark, testHarness.getCurrentWatermark());
+ // watermark hold until bundle complete
+ assertEquals(0, testHarness.getOutput().size());
+
testHarness.close(); // triggers finish bundle
assertThat(
testHarness.getOutput(),
- contains(new StreamRecord<>(three), new Watermark(Long.MAX_VALUE)));
+ contains(
+ new StreamRecord<>(three), new Watermark(watermark), new Watermark(Long.MAX_VALUE)));
assertThat(
testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),