You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:03 UTC
[beam] 03/04: Merge pull request #11595: [BEAM-9801] Fire timers
set within timers in Spark
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
commit f2bfed6cd9795cc61846e95f3bffaa8d52b012ad
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:46:54 2020 +0200
Merge pull request #11595: [BEAM-9801] Fire timers set within timers in Spark
---
.../functions/FlinkExecutableStageFunction.java | 16 +---
.../translation/PipelineTranslatorUtils.java | 27 +++++--
.../translation/SparkExecutableStageFunction.java | 91 ++++++++++------------
3 files changed, 59 insertions(+), 75 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f8c9c24..bf3355d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -253,20 +252,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
receiverFactory, timerReceiverFactory, stateRequestHandler, progressHandler)) {
PipelineTranslatorUtils.fireEligibleTimers(
- timerInternals,
- (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
- FnDataReceiver<Timer> fnTimerReceiver =
- bundle.getTimerReceivers().get(transformAndTimerId);
- Preconditions.checkNotNull(
- fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
- try {
- fnTimerReceiver.accept(timerValue);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
- }
- },
- currentTimerKey);
+ timerInternals, bundle.getTimerReceivers(), currentTimerKey);
}
}
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index c624e5c..1fa4ef0 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -22,7 +22,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.function.BiConsumer;
+import java.util.Locale;
+import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.runners.core.InMemoryTimerInternals;
@@ -36,6 +37,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -109,7 +111,7 @@ public final class PipelineTranslatorUtils {
*/
public static void fireEligibleTimers(
InMemoryTimerInternals timerInternals,
- BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+ Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
Object currentTimerKey) {
boolean hasFired;
@@ -119,22 +121,22 @@ public final class PipelineTranslatorUtils {
while ((timer = timerInternals.removeNextEventTimer()) != null) {
hasFired = true;
- fireTimer(timer, timerConsumer, currentTimerKey);
+ fireTimer(timer, timerReceivers, currentTimerKey);
}
while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
hasFired = true;
- fireTimer(timer, timerConsumer, currentTimerKey);
+ fireTimer(timer, timerReceivers, currentTimerKey);
}
while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
hasFired = true;
- fireTimer(timer, timerConsumer, currentTimerKey);
+ fireTimer(timer, timerReceivers, currentTimerKey);
}
} while (hasFired);
}
private static void fireTimer(
TimerInternals.TimerData timer,
- BiConsumer<KV<String, String>, Timer<?>> timerConsumer,
+ Map<KV<String, String>, FnDataReceiver<Timer>> timerReceivers,
Object currentTimerKey) {
StateNamespace namespace = timer.getNamespace();
Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
@@ -149,7 +151,16 @@ public final class PipelineTranslatorUtils {
timestamp,
outputTimestamp,
PaneInfo.NO_FIRING);
- timerConsumer.accept(
- TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId()), timerValue);
+ KV<String, String> transformAndTimerId =
+ TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerId());
+ FnDataReceiver<Timer> fnTimerReceiver = timerReceivers.get(transformAndTimerId);
+ Preconditions.checkNotNull(
+ fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
+ try {
+ fnTimerReceiver.accept(timerValue);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
+ }
}
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index 954ccc5..a9a0dec 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -59,8 +59,6 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
@@ -128,42 +126,53 @@ class SparkExecutableStageFunction<InputT, SideInputT>
StateRequestHandler stateRequestHandler =
getStateRequestHandler(
executableStage, stageBundleFactory.getProcessBundleDescriptor());
- if (executableStage.getTimers().size() > 0) {
- // Used with Batch, we know that all the data is available for this key. We can't use the
- // timer manager from the context because it doesn't exist. So we create one and advance
- // time to the end after processing all elements.
- final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- timerInternals.advanceProcessingTime(Instant.now());
- timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
+ if (executableStage.getTimers().size() == 0) {
ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
-
- TimerReceiverFactory timerReceiverFactory =
- new TimerReceiverFactory(
- stageBundleFactory,
- (Timer<?> timer, TimerInternals.TimerData timerData) -> {
- currentTimerKey = timer.getUserKey();
- timerInternals.setTimer(timerData);
- },
- windowCoder);
-
- // Process inputs.
processElements(
executableStage,
stateRequestHandler,
receiverFactory,
- timerReceiverFactory,
+ null,
stageBundleFactory,
inputs);
+ return collector.iterator();
+ }
+ // Used with Batch, we know that all the data is available for this key. We can't use the
+ // timer manager from the context because it doesn't exist. So we create one and advance
+ // time to the end after processing all elements.
+ final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+ timerInternals.advanceProcessingTime(Instant.now());
+ timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+
+ ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
+
+ TimerReceiverFactory timerReceiverFactory =
+ new TimerReceiverFactory(
+ stageBundleFactory,
+ (Timer<?> timer, TimerInternals.TimerData timerData) -> {
+ currentTimerKey = timer.getUserKey();
+ timerInternals.setTimer(timerData);
+ },
+ windowCoder);
+
+ // Process inputs.
+ processElements(
+ executableStage,
+ stateRequestHandler,
+ receiverFactory,
+ timerReceiverFactory,
+ stageBundleFactory,
+ inputs);
- // Finish any pending windows by advancing the input watermark to infinity.
- timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
- // Finally, advance the processing time to infinity to fire any timers.
- timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
- timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ // Finish any pending windows by advancing the input watermark to infinity.
+ timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ // Finally, advance the processing time to infinity to fire any timers.
+ timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
- // Now we fire the timers and process elements generated by timers (which may be timers
- // itself)
+ // Now we fire the timers and process elements generated by timers (which may be timers
+ // itself)
+ while (timerInternals.hasPendingTimers()) {
try (RemoteBundle bundle =
stageBundleFactory.getBundle(
receiverFactory,
@@ -172,30 +181,8 @@ class SparkExecutableStageFunction<InputT, SideInputT>
getBundleProgressHandler())) {
PipelineTranslatorUtils.fireEligibleTimers(
- timerInternals,
- (KV<String, String> transformAndTimerId, Timer<?> timerValue) -> {
- FnDataReceiver<Timer> fnTimerReceiver =
- bundle.getTimerReceivers().get(transformAndTimerId);
- Preconditions.checkNotNull(
- fnTimerReceiver, "No FnDataReceiver found for %s", transformAndTimerId);
- try {
- fnTimerReceiver.accept(timerValue);
- } catch (Exception e) {
- throw new RuntimeException(
- String.format(Locale.ENGLISH, "Failed to process timer: %s", timerValue));
- }
- },
- currentTimerKey);
+ timerInternals, bundle.getTimerReceivers(), currentTimerKey);
}
- } else {
- ReceiverFactory receiverFactory = new ReceiverFactory(collector, outputMap);
- processElements(
- executableStage,
- stateRequestHandler,
- receiverFactory,
- null,
- stageBundleFactory,
- inputs);
}
return collector.iterator();
}