You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:03 UTC

[beam] 03/04: Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f2bfed6cd9795cc61846e95f3bffaa8d52b012ad
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:46:54 2020 +0200

    Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark
---
 .../functions/FlinkExecutableStageFunction.java    | 16 +---
 .../translation/PipelineTranslatorUtils.java       | 27 +++++--
 .../translation/SparkExecutableStageFunction.java  | 91 ++++++++++------------
 3 files changed, 59 insertions(+), 75 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f8c9c24..bf3355d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -253,20 +252,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
               receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
 
         PipelineTranslatorUtils.fireEligibleTimers(
-            timerInternals,
-            (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-              FnDataReceiver<Timer> fnTimerReceiver =
-                  bundle.getTimerReceivers().get(transformAndTimerId);
-              Preconditions.checkNotNull(
-                  fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-              try {
-                fnTimerReceiver.accept(timerValue);
-              } catch (Exception e) {
-                throw new RuntimeException(
-                    String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-              }
-            },
-            currentTimerKey);
+            timerInternals, bundle.getTimerReceivers(), currentTimerKey);
       }
     }
   }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index c624e5c..1fa4ef0 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -22,7 +22,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.BiConsumer;
+import java.util.Locale;
+import java.util.Map;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
@@ -36,6 +37,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -109,7 +111,7 @@ public final class PipelineTranslatorUtils {
    */
   public static void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
 
     boolean hasFired;
@@ -119,22 +121,22 @@ public final class PipelineTranslatorUtils {
 
       while ((timer = timerInternals.removeNextEventTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
       while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
         hasFired = true;
-        fireTimer(timer, timerConsumer, currentTimerKey);
+        fireTimer(timer, timerReceivers, currentTimerKey);
       }
     } while (hasFired);
   }
 
   private static void fireTimer(
       TimerInternals.TimerData timer,
-      BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+      Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
       Object currentTimerKey) {
     StateNamespace namespace = timer.getNamespace();
     Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
@@ -149,7 +151,16 @@ public final class PipelineTranslatorUtils {
             timestamp,
             outputTimestamp,
             PaneInfo.NO_FIRING);
-    timerConsumer.accept(
-        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId()), timerValue);
+    KV<String, String> transformAndTimerId =
+        TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+    FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
+    Preconditions.checkNotNull(
+        fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+    try {
+      fnTimerReceiver.accept(timerValue);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+    }
   }
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 954ccc5..a9a0dec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -59,8 +59,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.broadcast.Broadcast;
@@ -128,42 +126,53 @@ class SparkExecutableStageFunction<InputT, SideInputT>
         StateRequestHandler stateRequestHandler =
             getStateRequestHandler(
                 executableStage, stageBundleFactory.getProcessBundleDescriptor());
-        if (executableStage.getTimers().size() > 0) {
-          // Used with Batch, we know that all the data is available for this key. We can't use the
-          // timer manager from the context because it doesn't exist. So we create one and advance
-          // time to the end after processing all elements.
-          final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-          timerInternals.advanceProcessingTime(Instant.now());
-          timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
+        if (executableStage.getTimers().size() == 0) {
           ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-
-          TimerReceiverFactory timerReceiverFactory =
-              new TimerReceiverFactory(
-                  stageBundleFactory,
-                  (Timer<?> timer, TimerInternals.TimerData timerData) -> {
-                    currentTimerKey = timer.getUserKey();
-                    timerInternals.setTimer(timerData);
-                  },
-                  windowCoder);
-
-          // Process inputs.
           processElements(
               executableStage,
               stateRequestHandler,
               receiverFactory,
-              timerReceiverFactory,
+              null,
               stageBundleFactory,
               inputs);
+          return collector.iterator();
+        }
+        // Used with Batch, we know that all the data is available for this key. We can't use the
+        // timer manager from the context because it doesn't exist. So we create one and advance
+        // time to the end after processing all elements.
+        final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+        timerInternals.advanceProcessingTime(Instant.now());
+        timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+        ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
+
+        TimerReceiverFactory timerReceiverFactory =
+            new TimerReceiverFactory(
+                stageBundleFactory,
+                (Timer<?> timer, TimerInternals.TimerData timerData) -> {
+                  currentTimerKey = timer.getUserKey();
+                  timerInternals.setTimer(timerData);
+                },
+                windowCoder);
+
+        // Process inputs.
+        processElements(
+            executableStage,
+            stateRequestHandler,
+            receiverFactory,
+            timerReceiverFactory,
+            stageBundleFactory,
+            inputs);
 
-          // Finish any pending windows by advancing the input watermark to infinity.
-          timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          // Finally, advance the processing time to infinity to fire any timers.
-          timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-          timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        // Finish any pending windows by advancing the input watermark to infinity.
+        timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        // Finally, advance the processing time to infinity to fire any timers.
+        timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-          // Now we fire the timers and process elements generated by timers (which may be timers
-          // itself)
+        // Now we fire the timers and process elements generated by timers (which may be timers
+        // itself)
+        while (timerInternals.hasPendingTimers()) {
           try (RemoteBundle bundle =
               stageBundleFactory.getBundle(
                   receiverFactory,
@@ -172,30 +181,8 @@ class SparkExecutableStageFunction<InputT, SideInputT>
                   getBundleProgressHandler())) {
 
             PipelineTranslatorUtils.fireEligibleTimers(
-                timerInternals,
-                (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
-                  FnDataReceiver<Timer> fnTimerReceiver =
-                      bundle.getTimerReceivers().get(transformAndTimerId);
-                  Preconditions.checkNotNull(
-                      fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
-                  try {
-                    fnTimerReceiver.accept(timerValue);
-                  } catch (Exception e) {
-                    throw new RuntimeException(
-                        String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
-                  }
-                },
-                currentTimerKey);
+                timerInternals, bundle.getTimerReceivers(), currentTimerKey);
           }
-        } else {
-          ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-          processElements(
-              executableStage,
-              stateRequestHandler,
-              receiverFactory,
-              null,
-              stageBundleFactory,
-              inputs);
         }
         return collector.iterator();
       }