You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:02 UTC

[beam] 02/04: [BEAM-9733] Repeatedly fire in batch mode until there are no more timers

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 13f1e7b15de08054b32d57ce0fbca4911c91f8d2
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Apr 25 14:09:26 2020 +0200

    [BEAM-9733] Repeatedly fire in batch mode until there are no more timers
---
 .../beam/runners/core/InMemoryTimerInternals.java  |  7 ++++
 .../functions/FlinkExecutableStageFunction.java    | 40 ++++++++++++----------
 2 files changed, 28 insertions(+), 19 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7fbfaf0..21420a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -66,6 +66,13 @@ public class InMemoryTimerInternals implements TimerInternals {
     return outputWatermarkTime;
   }
 
+  /** Returns true when there are still timers to be fired. */
+  public boolean hasPendingTimers() {
+    return !(watermarkTimers.isEmpty()
+        && processingTimers.isEmpty()
+        && synchronizedProcessingTimers.isEmpty());
+  }
+
   /**
    * Returns when the next timer in the given time domain will fire, or {@code null} if there are no
    * timers scheduled in that time domain.
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index ea1c583..f8c9c24 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -247,25 +247,27 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     // Now we fire the timers and process elements generated by timers (which may be timers itself)
-    try (RemoteBundle bundle =
-        stageBundleFactory.getBundle(
-            receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
-
-      PipelineTranslatorUtils.fireEligibleTimers(
-          timerInternals,
-          (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-            FnDataReceiver<Timer> fnTimerReceiver =
-                bundle.getTimerReceivers().get(transformAndTimerId);
-            Preconditions.checkNotNull(
-                fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-            try {
-              fnTimerReceiver.accept(timerValue);
-            } catch (Exception e) {
-              throw new RuntimeException(
-                  String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-            }
-          },
-          currentTimerKey);
+    while (timerInternals.hasPendingTimers()) {
+      try (RemoteBundle bundle =
+          stageBundleFactory.getBundle(
+              receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
+
+        PipelineTranslatorUtils.fireEligibleTimers(
+            timerInternals,
+            (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
+              FnDataReceiver<Timer> fnTimerReceiver =
+                  bundle.getTimerReceivers().get(transformAndTimerId);
+              Preconditions.checkNotNull(
+                  fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+              try {
+                fnTimerReceiver.accept(timerValue);
+              } catch (Exception e) {
+                throw new RuntimeException(
+                    String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+              }
+            },
+            currentTimerKey);
+      }
     }
   }