You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:02 UTC
[beam] 02/04: [BEAM-9733] Repeatedly fire in batch mode until there
are no more timers
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 13f1e7b15de08054b32d57ce0fbca4911c91f8d2
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Apr 25 14:09:26 2020 +0200
[BEAM-9733] Repeatedly fire in batch mode until there are no more timers
---
.../beam/runners/core/InMemoryTimerInternals.java | 7 ++++
.../functions/FlinkExecutableStageFunction.java | 40 ++++++++++++----------
2 files changed, 28 insertions(+), 19 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7fbfaf0..21420a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -66,6 +66,13 @@ public class InMemoryTimerInternals implements TimerInternals {
return outputWatermarkTime;
}
+ /** Returns true when there are still timers to be fired. */
+ public boolean hasPendingTimers() {
+ return !(watermarkTimers.isEmpty()
+ && processingTimers.isEmpty()
+ && synchronizedProcessingTimers.isEmpty());
+ }
+
/**
* Returns when the next timer in the given time domain will fire, or {@code null} if there are no
* timers scheduled in that time domain.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index ea1c583..f8c9c24 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -247,25 +247,27 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
// Now we fire the timers and process elements generated by timers (which may be timers itself)
- try (RemoteBundle bundle =
- stageBundleFactory.getBundle(
- receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
- PipelineTranslatorUtils.fireEligibleTimers(
- timerInternals,
- (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
- FnDataReceiver<Timer> fnTimerReceiver =
- bundle.getTimerReceivers().get(transformAndTimerId);
- Preconditions.checkNotNull(
- fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
- try {
- fnTimerReceiver.accept(timerValue);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
- }
- },
- currentTimerKey);
+ while (timerInternals.hasPendingTimers()) {
+ try (RemoteBundle bundle =
+ stageBundleFactory.getBundle(
+ receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
+
+ PipelineTranslatorUtils.fireEligibleTimers(
+ timerInternals,
+ (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
+ FnDataReceiver<Timer> fnTimerReceiver =
+ bundle.getTimerReceivers().get(transformAndTimerId);
+ Preconditions.checkNotNull(
+ fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+ try {
+ fnTimerReceiver.accept(timerValue);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+ }
+ },
+ currentTimerKey);
+ }
}
}