You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/11 09:28:49 UTC
[beam] branch master updated: [BEAM-6796] [flink] Fully finish
bundle before timer callback
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9262402 [BEAM-6796] [flink] Fully finish bundle before timer callback
new bf1daeb Merge pull request #8023: [BEAM-6796][flink] Fully finish bundle before timer callback
9262402 is described below
commit 92624029e3560bb5c81b8dc54c0cafb537b60493
Author: Thomas Weise <th...@apache.org>
AuthorDate: Sat Mar 9 09:50:43 2019 -0800
[BEAM-6796] [flink] Fully finish bundle before timer callback
---
.../flink/translation/wrappers/streaming/DoFnOperator.java | 12 ++++++++++++
.../wrappers/streaming/ExecutableStageDoFnOperator.java | 12 ++----------
2 files changed, 14 insertions(+), 10 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1979b83..b748b38 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -187,6 +187,8 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
private transient ScheduledFuture<?> checkFinishBundleTimer;
/** Time that the last bundle was finished (to set the timer). */
private transient long lastFinishBundleTime;
+ /** Callback to be executed after the current bundle was finshed. */
+ private transient Runnable bundleFinishedCallback;
public DoFnOperator(
DoFn<InputT, OutputT> doFn,
@@ -457,6 +459,10 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
pushedBackWatermark = watermark;
}
+ protected void setBundleFinishedCallback(Runnable callback) {
+ this.bundleFinishedCallback = callback;
+ }
+
@Override
public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) {
checkInvokeStartBundle();
@@ -679,6 +685,12 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
pushbackDoFnRunner.finishBundle();
elementCount = 0L;
lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+ // callback only after current bundle was fully finalized
+ // it could start a new bundle, for example resulting from timer processing
+ if (bundleFinishedCallback != null) {
+ bundleFinishedCallback.run();
+ bundleFinishedCallback = null;
+ }
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 011cfc4..8b0685b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -479,7 +479,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
// output watermark.
backupWatermarkHold = Math.max(backupWatermarkHold, getPushbackWatermarkHold());
setPushedBackWatermark(Math.min(currentOutputWatermark, backupWatermarkHold));
- sdkHarnessRunner.setBundleFinishedCallback(
+ super.setBundleFinishedCallback(
() -> {
try {
LOG.debug("processing pushed back watermark: {}", mark);
@@ -516,7 +516,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
private RemoteBundle remoteBundle;
private FnDataReceiver<WindowedValue<?>> mainInputReceiver;
- private Runnable bundleFinishedCallback;
// Timer key set before calling Flink's internal timer service to register
// a timer. The timer service will retrieve this with a call to {@code getCurrentKey}.
// Before firing a timer, this will be initialized with the current key
@@ -594,6 +593,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
Preconditions.checkNotNull(
currentTimerKey, "Key for timer needs to be set before calling onTimer");
+ Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle");
LOG.debug("timer callback: {} {} {} {}", timerId, window, timestamp, timeDomain);
FnDataReceiver<WindowedValue<?>> timerReceiver =
Preconditions.checkNotNull(
@@ -628,10 +628,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
} finally {
remoteBundle = null;
}
- if (bundleFinishedCallback != null) {
- bundleFinishedCallback.run();
- bundleFinishedCallback = null;
- }
}
/** Key for timer which has not been registered yet. */
@@ -648,10 +644,6 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
return remoteBundle != null;
}
- void setBundleFinishedCallback(Runnable callback) {
- this.bundleFinishedCallback = callback;
- }
-
private void emitResults() {
KV<String, OutputT> result;
while ((result = outputQueue.poll()) != null) {