You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/18 20:08:29 UTC
[29/50] [abbrv] beam git commit: ApexRunner SDF support
ApexRunner SDF support
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5efca02
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5efca02
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5efca02
Branch: refs/heads/gearpump-runner
Commit: f5efca0292eb9ace2d6b4895bc08e94344854336
Parents: 69522fe
Author: Thomas Weise <th...@apache.org>
Authored: Sat Apr 8 13:58:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 15 17:04:20 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 34 +++++
.../translation/ApexPipelineTranslator.java | 23 ++++
.../apex/translation/ParDoTranslator.java | 68 +++++++++-
.../operators/ApexGroupByKeyOperator.java | 4 +-
.../operators/ApexParDoOperator.java | 124 ++++++++++++++++++-
.../operators/ApexProcessFnOperator.java | 8 +-
.../operators/ApexTimerInternals.java | 21 +++-
.../translation/utils/ApexStateInternals.java | 4 +
.../translation/utils/StateInternalsProxy.java | 7 +-
.../operators/ApexTimerInternalsTest.java | 8 +-
10 files changed, 286 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 366308e..2fd0b22 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -31,6 +31,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.api.EmbeddedAppLauncher;
@@ -38,9 +39,11 @@ import org.apache.apex.api.Launcher;
import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.LaunchMode;
import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PrimitiveCreate;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
@@ -51,18 +54,23 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.AsIterable;
import org.apache.beam.sdk.transforms.View.AsSingleton;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.hadoop.conf.Configuration;
/**
@@ -112,6 +120,10 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
PTransformOverride.of(
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
new StreamingCombineGloballyAsSingletonView.Factory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(),
+ new SplittableParDoOverrideFactory<>()))
.build();
}
@@ -424,4 +436,26 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
}
+ /**
+ * A {@link PTransformOverrideFactory} that overrides a
+ * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with
+ * {@link SplittableParDo}.
+ */
+ static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<
+ PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> {
+ @Override
+ public PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(
+ AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>>
+ transform) {
+ return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform),
+ new SplittableParDo<>(transform.getTransform()));
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
+ PCollectionTuple newOutput) {
+ return ReplacementOutputs.tagged(outputs, newOutput);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index fdeefc7..b3a6d1c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
+import org.apache.beam.runners.apex.translation.operators.ApexProcessFnOperator;
import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.construction.PrimitiveCreate;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
@@ -35,6 +37,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.slf4j.Logger;
@@ -60,6 +63,10 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
static {
// register TransformTranslators
registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>());
+ registerTransformTranslator(SplittableParDo.ProcessElements.class,
+ new ParDoTranslator.SplittableProcessElementsTranslator());
+ registerTransformTranslator(SplittableParDo.GBKIntoKeyedWorkItems.class,
+ new GBKIntoKeyedWorkItemsTranslator());
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
@@ -174,4 +181,20 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
LOG.debug("view {}", view.getName());
}
}
+
+ private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+ implements TransformTranslator<SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+ @Override
+ public void translate(
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform, TranslationContext context) {
+ // https://issues.apache.org/jira/browse/BEAM-1850
+ ApexProcessFnOperator<KV<K, InputT>> operator = ApexProcessFnOperator.toKeyedWorkItems(
+ context.getPipelineOptions());
+ context.addOperator(operator, operator.outputPort);
+ context.addStream(context.getInput(), operator.inputPort);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index 2e3d902..9133cb6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -30,11 +30,13 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
@@ -83,7 +85,7 @@ class ParDoTranslator<InputT, OutputT>
}
Map<TupleTag<?>, PValue> outputs = context.getOutputs();
- PCollection<InputT> input = (PCollection<InputT>) context.getInput();
+ PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
WindowedValueCoder<InputT> wvInputCoder =
@@ -130,6 +132,70 @@ class ParDoTranslator<InputT, OutputT>
}
}
+ static class SplittableProcessElementsTranslator<InputT, OutputT,
+ RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ implements TransformTranslator<SplittableParDo.ProcessElements<InputT, OutputT,
+ RestrictionT, TrackerT>> {
+
+ @Override
+ public void translate(
+ SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+ TranslationContext context) {
+
+ Map<TupleTag<?>, PValue> outputs = context.getOutputs();
+ PCollection<InputT> input = context.getInput();
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Coder<InputT> inputCoder = input.getCoder();
+ WindowedValueCoder<InputT> wvInputCoder =
+ FullWindowedValueCoder.of(
+ inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ DoFn<InputT, OutputT> doFn = (DoFn) transform.newProcessFn(transform.getFn());
+ ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ doFn,
+ transform.getMainOutputTag(),
+ transform.getAdditionalOutputTags().getAll(),
+ input.getWindowingStrategy(),
+ sideInputs,
+ wvInputCoder,
+ context.getStateBackend());
+
+ Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
+ for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+ checkArgument(
+ output.getValue() instanceof PCollection,
+ "%s %s outputs non-PCollection %s of type %s",
+ ParDo.MultiOutput.class.getSimpleName(),
+ context.getFullName(),
+ output.getValue(),
+ output.getValue().getClass().getSimpleName());
+ PCollection<?> pc = (PCollection<?>) output.getValue();
+ if (output.getKey().equals(transform.getMainOutputTag())) {
+ ports.put(pc, operator.output);
+ } else {
+ int portIndex = 0;
+ for (TupleTag<?> tag : transform.getAdditionalOutputTags().getAll()) {
+ if (tag.equals(output.getKey())) {
+ ports.put(pc, operator.additionalOutputPorts[portIndex]);
+ break;
+ }
+ portIndex++;
+ }
+ }
+ }
+
+ context.addOperator(operator, ports);
+ context.addStream(context.getInput(), operator.input);
+ if (!sideInputs.isEmpty()) {
+ addSideInputs(operator.sideInput1, sideInputs, context);
+ }
+
+ }
+ }
+
+
static void addSideInputs(
Operator.InputPort<?> sideInputPort,
List<PCollectionView<?>> sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 85836ad..1d48e20 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -202,7 +202,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPane());
- timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark);
+ timerInternals.setContext(kv.getKey(), this.keyCoder, this.inputWatermark, null);
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
newReduceFnRunner(kv.getKey());
reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
@@ -211,7 +211,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
@Override
public void fireTimer(K key, Collection<TimerData> timerData) {
- timerInternals.setContext(key, keyCoder, inputWatermark);
+ timerInternals.setContext(key, keyCoder, inputWatermark, null);
ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
try {
reduceFnRunner.onTimers(timerData);
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 8c516b1..7fee0d5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.apex.translation.operators;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
@@ -28,8 +31,10 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
@@ -44,20 +49,33 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -65,6 +83,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +91,8 @@ import org.slf4j.LoggerFactory;
/**
* Apex operator for Beam {@link DoFn}.
*/
-public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager,
+ ApexTimerInternals.TimerProcessor<Object> {
private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
private boolean traceTuples = true;
@@ -139,6 +159,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
TimerInternals.TimerDataCoder timerCoder =
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
+
+ if (doFn instanceof SplittableParDo.ProcessFn) {
+ // we know that it is keyed on String
+ Coder<?> keyCoder = StringUtf8Coder.of();
+ this.currentKeyStateInternals = new StateInternalsProxy<>(
+ stateBackend.newStateInternalsFactory(keyCoder));
+ }
+
}
@SuppressWarnings("unused") // for Kryo
@@ -272,7 +300,10 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
keyCoder = kwiCoder.getKeyCoder();
}
((StateInternalsProxy) currentKeyStateInternals).setKey(key);
- currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark));
+ currentKeyTimerInternals.setContext(key, keyCoder,
+ new Instant(this.currentInputWatermark),
+ new Instant(this.currentOutputWatermark)
+ );
}
Iterable<WindowedValue<InputT>> pushedBack = pushbackDoFnRunner
.processElementInReadyWindows(elem);
@@ -286,9 +317,47 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
}
}
+ @Override
+ public void fireTimer(Object key, Collection<TimerData> timerDataSet) {
+ pushbackDoFnRunner.startBundle();
+ @SuppressWarnings("unchecked")
+ Coder<Object> keyCoder = (Coder) currentKeyStateInternals.getKeyCoder();
+ ((StateInternalsProxy) currentKeyStateInternals).setKey(key);
+ currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark),
+ new Instant(this.currentOutputWatermark));
+ for (TimerData timerData : timerDataSet) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof WindowNamespace);
+ BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
+ pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
+ timerData.getTimestamp(), timerData.getDomain());
+ }
+ pushbackDoFnRunner.finishBundle();
+ }
+
private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
this.currentInputWatermark = mark.getTimestamp();
-
+ long minEventTimeTimer = currentKeyTimerInternals.fireReadyTimers(
+ this.currentInputWatermark,
+ this, TimeDomain.EVENT_TIME);
+
+ checkState(minEventTimeTimer >= currentInputWatermark,
+ "Event time timer processing generates new timer(s) behind watermark.");
+ //LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+ // currentInputWatermark);
+
+ // TODO: is this the right way to trigger processing time timers?
+ // drain all timers below current watermark, including those that result from firing
+ long minProcessingTimeTimer = Long.MIN_VALUE;
+ while (minProcessingTimeTimer < currentInputWatermark) {
+ minProcessingTimeTimer = currentKeyTimerInternals.fireReadyTimers(
+ this.currentInputWatermark,
+ this, TimeDomain.PROCESSING_TIME);
+ if (minProcessingTimeTimer < currentInputWatermark) {
+ LOG.info("Processing time timer {} registered behind watermark {}", minProcessingTimeTimer,
+ currentInputWatermark);
+ }
+ }
if (sideInputs.isEmpty()) {
if (traceTuples) {
LOG.debug("\nemitting watermark {}\n", mark);
@@ -376,6 +445,52 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
pushbackDoFnRunner =
SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+ if (doFn instanceof SplittableParDo.ProcessFn) {
+
+ @SuppressWarnings("unchecked")
+ StateInternalsFactory<String> stateInternalsFactory =
+ (StateInternalsFactory<String>) this.currentKeyStateInternals.getFactory();
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ SplittableParDo.ProcessFn<InputT, OutputT, Object, RestrictionTracker<Object>>
+ splittableDoFn = (SplittableParDo.ProcessFn) doFn;
+ splittableDoFn.setStateInternalsFactory(stateInternalsFactory);
+ TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
+ @Override
+ public TimerInternals timerInternalsForKey(String key) {
+ return currentKeyTimerInternals;
+ }
+ };
+ splittableDoFn.setTimerInternalsFactory(timerInternalsFactory);
+ splittableDoFn.setProcessElementInvoker(
+ new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+ doFn,
+ pipelineOptions.get(),
+ new OutputWindowedValue<OutputT>() {
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ output(
+ mainOutputTag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag,
+ AdditionalOutputT output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+ output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ },
+ sideInputReader,
+ Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+ 10000,
+ Duration.standardSeconds(10)));
+ }
+
}
@Override
@@ -390,6 +505,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Override
public void endWindow() {
+ currentKeyTimerInternals.fireReadyTimers(
+ currentKeyTimerInternals.currentProcessingTime().getMillis(),
+ this, TimeDomain.PROCESSING_TIME);
}
private static class LongMin {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
index 835c9e0..9f4e6c5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexProcessFnOperator.java
@@ -91,7 +91,13 @@ public class ApexProcessFnOperator<InputT> extends BaseOperator {
/**
* Convert {@link KV} into {@link KeyedWorkItem}s.
*/
- public static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
+ public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems(
+ ApexPipelineOptions options) {
+ ApexOperatorFn<KV<K, V>> fn = new ToKeyedWorkItems<>();
+ return new ApexProcessFnOperator<KV<K, V>>(fn, options.isTupleTracingEnabled());
+ }
+
+ private static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
@Override
public final void process(ApexStreamTuple<WindowedValue<KV<K, V>>> tuple,
OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index 1eb224c..0a0dd50 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -55,6 +55,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
private transient K currentKey;
private transient Instant currentInputWatermark;
+ private transient Instant currentOutputWatermark;
private transient Coder<K> keyCoder;
public ApexTimerInternals(TimerDataCoder timerDataCoder) {
@@ -62,10 +63,12 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
this.processingTimeTimers = new TimerSet(timerDataCoder);
}
- public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark) {
+ public void setContext(K key, Coder<K> keyCoder, Instant inputWatermark,
+ Instant outputWatermark) {
this.currentKey = key;
this.keyCoder = keyCoder;
this.currentInputWatermark = inputWatermark;
+ this.currentOutputWatermark = outputWatermark;
}
@VisibleForTesting
@@ -118,7 +121,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
@Override
public Instant currentOutputWatermarkTime() {
- return null;
+ return currentOutputWatermark;
}
public interface TimerProcessor<K> {
@@ -128,11 +131,19 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
/**
* Fire the timers that are ready. These are the timers
* that are registered to be triggered at a time before the current time.
+ * Timer processing may register new timers, which can cause the returned
+ * timestamp to be before the the current time. The caller may repeat
+ * the call until such backdated timers are cleared.
+ * @return minimum timestamp of registered timers.
*/
- public void fireReadyTimers(long currentTime,
+ public long fireReadyTimers(long currentTime,
TimerProcessor<K> timerProcessor, TimeDomain timeDomain) {
TimerSet timers = getTimerSet(timeDomain);
+ // move minTimestamp first,
+ // timer additions that result from firing may modify it
+ timers.minTimestamp = currentTime;
+
// we keep the timers to return in a different list and launch them later
// because we cannot prevent a trigger from registering another timer,
// which would lead to concurrent modification exception.
@@ -173,6 +184,8 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
}
}
}
+
+ return timers.minTimestamp;
}
private Slice getKeyBytes(K key) {
@@ -186,6 +199,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
protected static class TimerSet implements Serializable {
private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
private final TimerDataCoder timerDataCoder;
+ private long minTimestamp = Long.MAX_VALUE;
protected TimerSet(TimerDataCoder timerDataCoder) {
this.timerDataCoder = timerDataCoder;
@@ -205,6 +219,7 @@ class ApexTimerInternals<K> implements TimerInternals, Serializable {
}
activeTimers.put(keyBytes, timersForKey);
+ this.minTimestamp = Math.min(minTimestamp, timer.getTimestamp().getMillis());
}
public void deleteTimer(Slice keyBytes, StateNamespace namespace, String timerId) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index eeea6d1..18ea8e4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -438,6 +438,10 @@ public class ApexStateInternals<K> implements StateInternals {
this.keyCoder = keyCoder;
}
+ public Coder<K> getKeyCoder() {
+ return this.keyCoder;
+ }
+
@Override
public ApexStateInternals<K> stateInternalsForKey(K key) {
final Slice keyBytes;
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
index ccf7e43..b652c68 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/StateInternalsProxy.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
@@ -34,7 +35,7 @@ import org.apache.beam.sdk.state.StateContext;
@DefaultSerializer(JavaSerializer.class)
public class StateInternalsProxy<K> implements StateInternals, Serializable {
- private final StateInternalsFactory<K> factory;
+ private final ApexStateInternals.ApexStateInternalsFactory<K> factory;
private transient K currentKey;
public StateInternalsProxy(ApexStateInternals.ApexStateInternalsFactory<K> factory) {
@@ -45,6 +46,10 @@ public class StateInternalsProxy<K> implements StateInternals, Serializable {
return this.factory;
}
+ public Coder<K> getKeyCoder() {
+ return factory.getKeyCoder();
+ }
+
public void setKey(K key) {
currentKey = key;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f5efca02/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 7b52223..ba1c801 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -60,7 +60,7 @@ public class ApexTimerInternalsTest {
Instant instant2 = new Instant(2);
ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
- timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+ timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);
TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
instant0, TimeDomain.EVENT_TIME);
@@ -98,7 +98,7 @@ public class ApexTimerInternalsTest {
Instant instant1 = new Instant(1);
ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
- timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now());
+ timerInternals.setContext(key1, StringUtf8Coder.of(), Instant.now(), null);
TimerData timerData0 = TimerData.of("timerData0", StateNamespaces.global(),
instant0, TimeDomain.EVENT_TIME);
@@ -133,11 +133,11 @@ public class ApexTimerInternalsTest {
new Instant(0), TimeDomain.EVENT_TIME);
String key = "key";
ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
- timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now());
+ timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
timerInternals.setTimer(timerData);
ApexTimerInternals<String> cloned;
assertNotNull("Serialization", cloned = KryoCloneUtils.cloneObject(timerInternals));
- cloned.setContext(key, StringUtf8Coder.of(), Instant.now());
+ cloned.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
Map<?, Set<Slice>> timers = cloned.getTimerSet(TimeDomain.EVENT_TIME).getMap();
assertEquals(1, timers.size());
}