You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/11 09:28:49 UTC

[beam] branch master updated: [BEAM-6796] [flink] Fully finish bundle before timer callback

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9262402  [BEAM-6796] [flink] Fully finish bundle before timer callback
     new bf1daeb  Merge pull request #8023: [BEAM-6796][flink] Fully finish bundle before timer callback
9262402 is described below

commit 92624029e3560bb5c81b8dc54c0cafb537b60493
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Mar 9 09:50:43 2019 -0800

    [BEAM-6796] [flink] Fully finish bundle before timer callback
---
 .../flink/translation/wrappers/streaming/DoFnOperator.java   | 12 ++++++++++++
 .../wrappers/streaming/ExecutableStageDoFnOperator.java      | 12 ++----------
 2 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1979b83..b748b38 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -187,6 +187,8 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
   private transient ScheduledFuture<?> checkFinishBundleTimer;
   /** Time that the last bundle was finished (to set the timer). */
   private transient long lastFinishBundleTime;
+  /** Callback to be executed after the current bundle was finshed. */
+  private transient Runnable bundleFinishedCallback;
 
   public DoFnOperator(
       DoFn<InputT, OutputT> doFn,
@@ -457,6 +459,10 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
     pushedBackWatermark = watermark;
   }
 
+  protected void setBundleFinishedCallback(Runnable callback) {
+    this.bundleFinishedCallback = callback;
+  }
+
   @Override
   public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) {
     checkInvokeStartBundle();
@@ -679,6 +685,12 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
       pushbackDoFnRunner.finishBundle();
       elementCount = 0L;
       lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+      // callback only after current bundle was fully finalized
+      // it could start a new bundle, for example resulting from timer processing
+      if (bundleFinishedCallback != null) {
+        bundleFinishedCallback.run();
+        bundleFinishedCallback = null;
+      }
     }
   }
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 011cfc4..8b0685b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -479,7 +479,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         // output watermark.
         backupWatermarkHold = Math.max(backupWatermarkHold, getPushbackWatermarkHold());
         setPushedBackWatermark(Math.min(currentOutputWatermark, backupWatermarkHold));
-        sdkHarnessRunner.setBundleFinishedCallback(
+        super.setBundleFinishedCallback(
             () -> {
               try {
                 LOG.debug("processing pushed back watermark: {}", mark);
@@ -516,7 +516,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
 
     private RemoteBundle remoteBundle;
     private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
-    private Runnable bundleFinishedCallback;
     // Timer key set before calling Flink's internal timer service to register
     // a timer. The timer service will retrieve this with a call to {@code getCurrentKey}.
     // Before firing a timer, this will be initialized with the current key
@@ -594,6 +593,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
         String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
       Preconditions.checkNotNull(
           currentTimerKey, "Key for timer needs to be set before calling onTimer");
+      Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle");
       LOG.debug("timer callback: {} {} {} {}", timerId, window, timestamp, timeDomain);
       FnDataReceiver<WindowedValue<?>> timerReceiver =
           Preconditions.checkNotNull(
@@ -628,10 +628,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       } finally {
         remoteBundle = null;
       }
-      if (bundleFinishedCallback != null) {
-        bundleFinishedCallback.run();
-        bundleFinishedCallback = null;
-      }
     }
 
     /** Key for timer which has not been registered yet. */
@@ -648,10 +644,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
       return remoteBundle != null;
     }
 
-    void setBundleFinishedCallback(Runnable callback) {
-      this.bundleFinishedCallback = callback;
-    }
-
     private void emitResults() {
       KV<String, OutputT> result;
       while ((result = outputQueue.poll()) != null) {