You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/11/23 10:02:17 UTC

[beam] branch release-2.9.0 updated: [BEAM-6116] [portable flink streaming] Emit pushed back watermark when bundle is complete.

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.9.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.9.0 by this push:
     new 2189179  [BEAM-6116] [portable flink streaming] Emit pushed back watermark when bundle is complete.
2189179 is described below

commit 2189179ca9ce187b79086500b4a7bfa6fdab0460
Author: Thomas Weise <th...@apache.org>
AuthorDate: Thu Nov 22 09:52:42 2018 -0800

    [BEAM-6116] [portable flink streaming] Emit pushed back watermark when bundle is complete.
---
 .../wrappers/streaming/ExecutableStageDoFnOperator.java   | 13 ++++++++-----
 .../flink/streaming/ExecutableStageDoFnOperatorTest.java  | 15 ++++++++++++++-
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2071702..31ad20a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -43,7 +43,6 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.core.construction.graph.TimerReference;
 import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -105,6 +104,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
   private transient StageBundleFactory stageBundleFactory;
   private transient ExecutableStage executableStage;
   private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+  private transient long backupWatermarkHold = Long.MIN_VALUE;
 
   public ExecutableStageDoFnOperator(
       String stepName,
@@ -353,7 +353,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
             progressHandler,
             outputManager,
             outputMap,
-            executableStage.getTimers(),
             (Coder<BoundedWindow>) windowingStrategy.getWindowFn().windowCoder(),
             keySelector,
             timerInternals);
@@ -394,11 +393,16 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       } else {
         // It is not safe to advance the output watermark yet, so add a hold on the current
         // output watermark.
-        setPushedBackWatermark(Math.min(currentOutputWatermark, getPushbackWatermarkHold()));
+        backupWatermarkHold = Math.max(backupWatermarkHold, getPushbackWatermarkHold());
+        setPushedBackWatermark(Math.min(currentOutputWatermark, backupWatermarkHold));
         sdkHarnessRunner.setBundleFinishedCallback(
             () -> {
               try {
-                processWatermark(mark);
+                LOG.debug("processing pushed back watermark: {}", mark);
+                // at this point the bundle is finished, allow the watermark to pass
+                // we are restoring the previous hold in case it was already set for side inputs
+                setPushedBackWatermark(backupWatermarkHold);
+                super.processWatermark(mark);
               } catch (Exception e) {
                 throw new RuntimeException(
                     "Failed to process pushed back watermark after finished bundle.", e);
@@ -442,7 +446,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         BundleProgressHandler progressHandler,
         BufferedOutputManager<OutputT> outputManager,
         Map<String, TupleTag<?>> outputMap,
-        Collection<TimerReference> timers,
         Coder<BoundedWindow> windowCoder,
         KeySelector<WindowedValue<InputT>, ?> keySelector,
         TimerInternals timerInternals) {
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index cc282c5..c34dd08 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.streaming;
 
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -273,13 +274,25 @@ public class ExecutableStageDoFnOperatorTest {
     OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<Integer>> testHarness =
         new OneInputStreamOperatorTestHarness<>(operator);
 
+    long watermark = testHarness.getCurrentWatermark() + 1;
     testHarness.open();
+
     testHarness.processElement(new StreamRecord<>(zero));
+
+    testHarness.processWatermark(watermark);
+    watermark++;
+    testHarness.processWatermark(watermark);
+
+    assertEquals(watermark, testHarness.getCurrentWatermark());
+    // watermark hold until bundle complete
+    assertEquals(0, testHarness.getOutput().size());
+
     testHarness.close(); // triggers finish bundle
 
     assertThat(
         testHarness.getOutput(),
-        contains(new StreamRecord<>(three), new Watermark(Long.MAX_VALUE)));
+        contains(
+            new StreamRecord<>(three), new Watermark(watermark), new Watermark(Long.MAX_VALUE)));
 
     assertThat(
         testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),