You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/16 03:47:39 UTC
[1/2] beam git commit: [BEAM-1612] Support real Bundle in Flink runner
Repository: beam
Updated Branches:
refs/heads/master 3a8b0b68c -> 724eda37e
[BEAM-1612] Support real Bundle in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ceec7ce5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ceec7ce5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ceec7ce5
Branch: refs/heads/master
Commit: ceec7ce5ba287ab40ee1f7c87129b72d4db1c1c7
Parents: 3a8b0b6
Author: JingsongLi <lz...@aliyun.com>
Authored: Thu Jun 15 17:48:59 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Aug 16 11:44:49 2017 +0800
----------------------------------------------------------------------
.../runners/flink/FlinkPipelineOptions.java | 11 +
.../FlinkStreamingTransformTranslators.java | 77 ++--
.../wrappers/streaming/DoFnOperator.java | 412 ++++++++++++++-----
.../streaming/SplittableDoFnOperator.java | 4 +-
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../beam/runners/flink/PipelineOptionsTest.java | 21 +-
.../flink/streaming/DoFnOperatorTest.java | 161 ++++++--
8 files changed, 535 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index c255672..2432394 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -127,4 +127,15 @@ public interface FlinkPipelineOptions
@Default.Boolean(false)
Boolean getRetainExternalizedCheckpointsOnCancellation();
void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation);
+
+ @Description("The maximum number of elements in a bundle.")
+ @Default.Long(1000)
+ Long getMaxBundleSize();
+ void setMaxBundleSize(Long size);
+
+ @Description("The maximum time to wait before finalising a bundle (in milliseconds).")
+ @Default.Long(1000)
+ Long getMaxBundleTimeMills();
+ void setMaxBundleTimeMills(Long time);
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 3d7e81f..058e195 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -339,7 +339,9 @@ class FlinkStreamingTransformTranslators {
List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToLabels,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+ Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+ Map<TupleTag<?>, Integer> tagsToIds,
Coder<WindowedValue<InputT>> inputCoder,
Coder keyCoder,
Map<Integer, PCollectionView<?>> transformedSideInputs);
@@ -360,15 +362,27 @@ class FlinkStreamingTransformTranslators {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
+ Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = Maps.newHashMap();
+
+ // We associate output tags with ids, the Integer is easier to serialize than TupleTag.
+ // The return map of AppliedPTransform.getOutputs() is an ImmutableMap, its implementation is
+ // RegularImmutableMap, its entrySet order is the same with the order of insertion.
+ // So we can use the original AppliedPTransform.getOutputs() to produce deterministic ids.
+ Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
+ int idCount = 0;
+ tagsToIds.put(mainOutputTag, idCount++);
for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
if (!tagsToOutputTags.containsKey(entry.getKey())) {
tagsToOutputTags.put(
entry.getKey(),
- new OutputTag<WindowedValue<?>>(
+ new OutputTag<>(
entry.getKey().getId(),
(TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())
)
);
+ tagsToCoders.put(entry.getKey(),
+ (Coder) context.getCoder((PCollection<OutputT>) entry.getValue()));
+ tagsToIds.put(entry.getKey(), idCount++);
}
}
@@ -409,6 +423,8 @@ class FlinkStreamingTransformTranslators {
context,
windowingStrategy,
tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
inputCoder,
keyCoder,
new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
@@ -430,6 +446,8 @@ class FlinkStreamingTransformTranslators {
context,
windowingStrategy,
tagsToOutputTags,
+ tagsToCoders,
+ tagsToIds,
inputCoder,
keyCoder,
transformedSideInputs.f0);
@@ -506,6 +524,8 @@ class FlinkStreamingTransformTranslators {
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+ Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+ Map<TupleTag<?>, Integer> tagsToIds,
Coder<WindowedValue<InputT>> inputCoder,
Coder keyCoder,
Map<Integer, PCollectionView<?>> transformedSideInputs) {
@@ -515,7 +535,8 @@ class FlinkStreamingTransformTranslators {
inputCoder,
mainOutputTag,
additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds),
windowingStrategy,
transformedSideInputs,
sideInputs,
@@ -551,25 +572,28 @@ class FlinkStreamingTransformTranslators {
@Override
public DoFnOperator<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT>
createDoFnOperator(
- DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
- String stepName,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- FlinkStreamingTranslationContext context,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
- Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
- inputCoder,
- Coder keyCoder,
- Map<Integer, PCollectionView<?>> transformedSideInputs) {
+ DoFn<KeyedWorkItem<String, KV<InputT, RestrictionT>>, OutputT> doFn,
+ String stepName,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+ Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+ Map<TupleTag<?>, Integer> tagsToIds,
+ Coder<WindowedValue<KeyedWorkItem<String, KV<InputT, RestrictionT>>>>
+ inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs) {
return new SplittableDoFnOperator<>(
doFn,
stepName,
inputCoder,
mainOutputTag,
additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(
+ mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds),
windowingStrategy,
transformedSideInputs,
sideInputs,
@@ -693,20 +717,21 @@ class FlinkStreamingTransformTranslators {
SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+ Coder<WindowedValue<KV<K, Iterable<InputT>>>> outputCoder =
+ context.getCoder(context.getOutput(transform));
TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- DoFnOperator.DefaultOutputManagerFactory<KV<K, Iterable<InputT>>> outputManagerFactory =
- new DoFnOperator.DefaultOutputManagerFactory<>();
+ TupleTag<KV<K, Iterable<InputT>>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+ mainTag,
Collections.<TupleTag<?>>emptyList(),
- outputManagerFactory,
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -793,6 +818,8 @@ class FlinkStreamingTransformTranslators {
AppliedCombineFn.withInputCoder(
transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
+ Coder<WindowedValue<KV<K, OutputT>>> outputCoder =
+ context.getCoder(context.getOutput(transform));
TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
@@ -800,14 +827,15 @@ class FlinkStreamingTransformTranslators {
if (sideInputs.isEmpty()) {
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, OutputT>>("main output"),
+ mainTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -826,14 +854,15 @@ class FlinkStreamingTransformTranslators {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
transformSideInputs(sideInputs, context);
+ TupleTag<KV<K, OutputT>> mainTag = new TupleTag<>("main output");
WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
new WindowDoFnOperator<>(
reduceFn,
context.getCurrentTransform().getFullName(),
(Coder) windowedWorkItemCoder,
- new TupleTag<KV<K, OutputT>>("main output"),
+ mainTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, outputCoder),
windowingStrategy,
transformSideInputs.f0,
sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 62de423..0bf860a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -21,14 +21,20 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
@@ -45,6 +51,7 @@ import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
@@ -57,6 +64,8 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
@@ -66,6 +75,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -87,6 +97,7 @@ import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.util.OutputTag;
import org.joda.time.Instant;
@@ -95,8 +106,6 @@ import org.joda.time.Instant;
*
* @param <InputT> the input type of the {@link DoFn}
* @param <OutputT> the output type of the {@link DoFn}
- * @param <OutputT> the output type of the operator, this can be different from the fn output
- * type when we have additional tagged outputs
*/
public class DoFnOperator<InputT, OutputT>
extends AbstractStreamOperator<WindowedValue<OutputT>>
@@ -125,7 +134,7 @@ public class DoFnOperator<InputT, OutputT>
protected transient SideInputReader sideInputReader;
- protected transient DoFnRunners.OutputManager outputManager;
+ protected transient BufferedOutputManager<OutputT> outputManager;
private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
@@ -137,7 +146,7 @@ public class DoFnOperator<InputT, OutputT>
private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
- protected transient FlinkStateInternals<?> stateInternals;
+ protected transient FlinkStateInternals<?> keyedStateInternals;
private final String stepName;
@@ -147,14 +156,24 @@ public class DoFnOperator<InputT, OutputT>
private final TimerInternals.TimerDataCoder timerCoder;
+ private final long maxBundleSize;
+
+ private final long maxBundleTimeMills;
+
protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
protected transient FlinkTimerInternals timerInternals;
- private transient StateInternals pushbackStateInternals;
+ private transient StateInternals nonKeyedStateInternals;
private transient Optional<Long> pushedBackWatermark;
+ // bundle control
+ private transient boolean bundleStarted = false;
+ private transient long elementCount;
+ private transient long lastFinishBundleTime;
+ private transient ScheduledFuture<?> checkFinishBundleTimer;
+
public DoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
@@ -184,10 +203,11 @@ public class DoFnOperator<InputT, OutputT>
this.timerCoder =
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
- }
- private org.apache.beam.runners.core.StepContext createStepContext() {
- return new StepContext();
+ FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
+
+ this.maxBundleSize = flinkOptions.getMaxBundleSize();
+ this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
}
// allow overriding this in WindowDoFnOperator because this one dynamically creates
@@ -204,8 +224,21 @@ public class DoFnOperator<InputT, OutputT>
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ FlinkPipelineOptions options =
+ serializedOptions.get().as(FlinkPipelineOptions.class);
sideInputReader = NullSideInputReader.of(sideInputs);
+ // maybe init by initializeState
+ if (nonKeyedStateInternals == null) {
+ if (keyCoder != null) {
+ nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+ getKeyedStateBackend());
+ } else {
+ nonKeyedStateInternals =
+ new FlinkSplitStateInternals<>(getOperatorStateBackend());
+ }
+ }
+
if (!sideInputs.isEmpty()) {
pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
@@ -217,26 +250,14 @@ public class DoFnOperator<InputT, OutputT>
sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
sideInputReader = sideInputHandler;
- // maybe init by initializeState
- if (pushbackStateInternals == null) {
- if (keyCoder != null) {
- pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
- getKeyedStateBackend());
- } else {
- pushbackStateInternals =
- new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
- }
- }
-
pushedBackWatermark = Optional.absent();
-
}
- outputManager = outputManagerFactory.create(output);
+ outputManager = outputManagerFactory.create(output, nonKeyedStateInternals);
// StatefulPardo or WindowDoFn
if (keyCoder != null) {
- stateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
+ keyedStateInternals = new FlinkStateInternals<>((KeyedStateBackend) getKeyedStateBackend(),
keyCoder);
timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
@@ -253,10 +274,10 @@ public class DoFnOperator<InputT, OutputT>
doFnInvoker.invokeSetup();
- org.apache.beam.runners.core.StepContext stepContext = createStepContext();
+ StepContext stepContext = new FlinkStepContext();
doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.get(),
+ options,
doFn,
sideInputReader,
outputManager,
@@ -301,11 +322,24 @@ public class DoFnOperator<InputT, OutputT>
stateCleaner);
}
- if ((serializedOptions.get().as(FlinkPipelineOptions.class))
- .getEnableMetrics()) {
+ if (options.getEnableMetrics()) {
doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
}
+ elementCount = 0L;
+ lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+
+ // Schedule timer to check timeout of finish bundle.
+ long bundleCheckPeriod = (maxBundleTimeMills + 1) / 2;
+ checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(
+ new ProcessingTimeCallback() {
+ @Override
+ public void onProcessingTime(long timestamp) throws Exception {
+ checkInvokeFinishBundleByTime();
+ }
+ },
+ bundleCheckPeriod, bundleCheckPeriod);
+
pushbackDoFnRunner =
SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
}
@@ -315,9 +349,9 @@ public class DoFnOperator<InputT, OutputT>
super.close();
// sanity check: these should have been flushed out by +Inf watermarks
- if (pushbackStateInternals != null) {
+ if (!sideInputs.isEmpty() && nonKeyedStateInternals != null) {
BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
if (pushedBackContents != null) {
@@ -328,10 +362,11 @@ public class DoFnOperator<InputT, OutputT>
}
}
}
+ checkFinishBundleTimer.cancel(true);
doFnInvoker.invokeTeardown();
}
- protected final long getPushbackWatermarkHold() {
+ private long getPushbackWatermarkHold() {
// if we don't have side inputs we never hold the watermark
if (sideInputs.isEmpty()) {
return Long.MAX_VALUE;
@@ -351,7 +386,7 @@ public class DoFnOperator<InputT, OutputT>
if (!pushedBackWatermark.isPresent()) {
BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
long min = Long.MAX_VALUE;
for (WindowedValue<InputT> value : pushedBack.read()) {
@@ -364,9 +399,9 @@ public class DoFnOperator<InputT, OutputT>
@Override
public final void processElement(
StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- doFnRunner.startBundle();
+ checkInvokeStartBundle();
doFnRunner.processElement(streamRecord.getValue());
- doFnRunner.finishBundle();
+ checkInvokeFinishBundleByCount();
}
private void setPushedBackWatermark(long watermark) {
@@ -376,12 +411,12 @@ public class DoFnOperator<InputT, OutputT>
@Override
public final void processElement1(
StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
+ checkInvokeStartBundle();
Iterable<WindowedValue<InputT>> justPushedBack =
pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
checkInitPushedBackWatermark();
@@ -391,13 +426,13 @@ public class DoFnOperator<InputT, OutputT>
pushedBack.add(pushedBackValue);
}
setPushedBackWatermark(min);
- pushbackDoFnRunner.finishBundle();
+ checkInvokeFinishBundleByCount();
}
@Override
public final void processElement2(
StreamRecord<RawUnionValue> streamRecord) throws Exception {
- pushbackDoFnRunner.startBundle();
+ checkInvokeStartBundle();
@SuppressWarnings("unchecked")
WindowedValue<Iterable<?>> value =
@@ -407,7 +442,7 @@ public class DoFnOperator<InputT, OutputT>
sideInputHandler.addSideInputValue(sideInput, value);
BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
@@ -433,7 +468,7 @@ public class DoFnOperator<InputT, OutputT>
}
setPushedBackWatermark(min);
- pushbackDoFnRunner.finishBundle();
+ checkInvokeFinishBundleByCount();
// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
@@ -446,6 +481,9 @@ public class DoFnOperator<InputT, OutputT>
@Override
public void processWatermark1(Watermark mark) throws Exception {
+
+ checkInvokeStartBundle();
+
// We do the check here because we are guaranteed to at least get the +Inf watermark on the
// main input when the job finishes.
if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -461,12 +499,9 @@ public class DoFnOperator<InputT, OutputT>
Math.min(getPushbackWatermarkHold(), currentInputWatermark);
if (potentialOutputWatermark > currentOutputWatermark) {
setCurrentOutputWatermark(potentialOutputWatermark);
- output.emitWatermark(new Watermark(currentOutputWatermark));
+ emitWatermark(currentOutputWatermark);
}
} else {
- // fireTimers, so we need startBundle.
- pushbackDoFnRunner.startBundle();
-
setCurrentInputWatermark(mark.getTimestamp());
// hold back by the pushed back values waiting for side inputs
@@ -474,7 +509,7 @@ public class DoFnOperator<InputT, OutputT>
timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark));
- Instant watermarkHold = stateInternals.watermarkHold();
+ Instant watermarkHold = keyedStateInternals.watermarkHold();
long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
@@ -482,14 +517,23 @@ public class DoFnOperator<InputT, OutputT>
if (potentialOutputWatermark > currentOutputWatermark) {
setCurrentOutputWatermark(potentialOutputWatermark);
- output.emitWatermark(new Watermark(currentOutputWatermark));
+ emitWatermark(currentOutputWatermark);
}
- pushbackDoFnRunner.finishBundle();
}
}
+ private void emitWatermark(long watermark) {
+ // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late events.
+ if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ invokeFinishBundle();
+ }
+ output.emitWatermark(new Watermark(watermark));
+ }
+
@Override
public void processWatermark2(Watermark mark) throws Exception {
+ checkInvokeStartBundle();
+
setCurrentSideInputWatermark(mark.getTimestamp());
if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
// this means we will never see any more side input
@@ -498,6 +542,7 @@ public class DoFnOperator<InputT, OutputT>
// maybe output a new watermark
processWatermark1(new Watermark(currentInputWatermark));
}
+
}
/**
@@ -516,10 +561,9 @@ public class DoFnOperator<InputT, OutputT>
* any future side input, i.e. that there is no point in waiting.
*/
private void emitAllPushedBackData() throws Exception {
- pushbackDoFnRunner.startBundle();
BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ nonKeyedStateInternals.state(StateNamespaces.global(), pushedBackTag);
Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
if (pushedBackContents != null) {
@@ -537,11 +581,65 @@ public class DoFnOperator<InputT, OutputT>
setPushedBackWatermark(Long.MAX_VALUE);
- pushbackDoFnRunner.finishBundle();
+ }
+
+ /**
+ * Check whether invoke startBundle, if it is, need to output elements that were
+ * buffered as part of finishing a bundle in snapshot() first.
+ *
+ * <p>In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or
+ * {@link DoFnRunner#onTimer(String, BoundedWindow, Instant, TimeDomain)} not between
+ * StartBundle and FinishBundle, this method needs to be called in each processElement
+ * and each processWatermark and onProcessingTime. Do not need to call in onEventTime,
+ * because it has been guaranteed in the processWatermark.
+ */
+ private void checkInvokeStartBundle() {
+ if (!bundleStarted) {
+ outputManager.flushBuffer();
+ pushbackDoFnRunner.startBundle();
+ bundleStarted = true;
+ }
+ }
+
+ /**
+ * Check whether invoke finishBundle by elements count. Called in processElement.
+ */
+ private void checkInvokeFinishBundleByCount() {
+ elementCount++;
+ if (elementCount >= maxBundleSize) {
+ invokeFinishBundle();
+ }
+ }
+
+ /**
+ * Check whether invoke finishBundle by timeout.
+ */
+ private void checkInvokeFinishBundleByTime() {
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ if (now - lastFinishBundleTime >= maxBundleTimeMills) {
+ invokeFinishBundle();
+ }
+ }
+
+ private void invokeFinishBundle() {
+ if (bundleStarted) {
+ pushbackDoFnRunner.finishBundle();
+ bundleStarted = false;
+ elementCount = 0L;
+ lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
+ }
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
+
+ // Forced finish a bundle in checkpoint barrier otherwise may lose data.
+ // Careful, it use OperatorState or KeyGroupState to store outputs, So it
+ // must be called before their snapshot.
+ outputManager.openBuffer();
+ invokeFinishBundle();
+ outputManager.closeBuffer();
+
// copy from AbstractStreamOperator
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
@@ -587,8 +685,8 @@ public class DoFnOperator<InputT, OutputT>
@Override
public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws Exception {
- if (!sideInputs.isEmpty() && keyCoder != null) {
- ((FlinkKeyGroupStateInternals) pushbackStateInternals).snapshotKeyGroupState(
+ if (keyCoder != null) {
+ ((FlinkKeyGroupStateInternals) nonKeyedStateInternals).snapshotKeyGroupState(
keyGroupIndex, out);
}
}
@@ -626,23 +724,26 @@ public class DoFnOperator<InputT, OutputT>
@Override
public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws Exception {
- if (!sideInputs.isEmpty() && keyCoder != null) {
- if (pushbackStateInternals == null) {
- pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+ if (keyCoder != null) {
+ if (nonKeyedStateInternals == null) {
+ nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
getKeyedStateBackend());
}
- ((FlinkKeyGroupStateInternals) pushbackStateInternals)
+ ((FlinkKeyGroupStateInternals) nonKeyedStateInternals)
.restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
}
}
@Override
public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
+ // We don't have to cal checkInvokeStartBundle() because it's already called in
+ // processWatermark*().
fireTimer(timer);
}
@Override
public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
+ checkInvokeStartBundle();
fireTimer(timer);
}
@@ -670,71 +771,186 @@ public class DoFnOperator<InputT, OutputT>
}
/**
- * Factory for creating an {@link DoFnRunners.OutputManager} from
+ * Factory for creating an {@link BufferedOutputManager} from
* a Flink {@link Output}.
*/
interface OutputManagerFactory<OutputT> extends Serializable {
- DoFnRunners.OutputManager create(Output<StreamRecord<WindowedValue<OutputT>>> output);
+ BufferedOutputManager<OutputT> create(
+ Output<StreamRecord<WindowedValue<OutputT>>> output,
+ StateInternals stateInternals);
}
/**
- * Default implementation of {@link OutputManagerFactory} that creates an
- * {@link DoFnRunners.OutputManager} that only writes to
- * a single logical output.
+ * A {@link DoFnRunners.OutputManager} that can buffer its outputs.
+ * Use {@link FlinkSplitStateInternals} or {@link FlinkKeyGroupStateInternals}
+ * to keep buffer data.
*/
- public static class DefaultOutputManagerFactory<OutputT>
- implements OutputManagerFactory<OutputT> {
+ public static class BufferedOutputManager<OutputT> implements
+ DoFnRunners.OutputManager {
+
+ private TupleTag<OutputT> mainTag;
+ private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
+ private Map<TupleTag<?>, Integer> tagsToIds;
+ private Map<Integer, TupleTag<?>> idsToTags;
+ protected Output<StreamRecord<WindowedValue<OutputT>>> output;
+
+ private boolean openBuffer = false;
+ private BagState<KV<Integer, WindowedValue<?>>> bufferState;
+
+ BufferedOutputManager(
+ Output<StreamRecord<WindowedValue<OutputT>>> output,
+ TupleTag<OutputT> mainTag,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+ final Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+ Map<TupleTag<?>, Integer> tagsToIds,
+ StateInternals stateInternals) {
+ this.output = output;
+ this.mainTag = mainTag;
+ this.tagsToOutputTags = tagsToOutputTags;
+ this.tagsToIds = tagsToIds;
+ this.idsToTags = new HashMap<>();
+ for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
+ idsToTags.put(entry.getValue(), entry.getKey());
+ }
+
+ ImmutableMap.Builder<Integer, Coder<WindowedValue<?>>> idsToCodersBuilder =
+ ImmutableMap.builder();
+ for (Map.Entry<TupleTag<?>, Integer> entry : tagsToIds.entrySet()) {
+ idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey()));
+ }
+
+ StateTag<BagState<KV<Integer, WindowedValue<?>>>> bufferTag =
+ StateTags.bag("bundle-buffer-tag",
+ new TaggedKvCoder(idsToCodersBuilder.build()));
+ bufferState = stateInternals.state(StateNamespaces.global(), bufferTag);
+ }
+
+ void openBuffer() {
+ this.openBuffer = true;
+ }
+
+ void closeBuffer() {
+ this.openBuffer = false;
+ }
+
@Override
- public DoFnRunners.OutputManager create(
- final Output<StreamRecord<WindowedValue<OutputT>>> output) {
- return new DoFnRunners.OutputManager() {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- // with tagged outputs we can't get around this because we don't
- // know our own output type...
- @SuppressWarnings("unchecked")
- WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
- output.collect(new StreamRecord<>(castValue));
- }
- };
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
+ if (!openBuffer) {
+ emit(tag, value);
+ } else {
+ bufferState.add(KV.<Integer, WindowedValue<?>>of(tagsToIds.get(tag), value));
+ }
+ }
+
+ /**
+ * Flush elements of bufferState to Flink Output. This method can't be invoke in
+ * {@link #snapshotState(StateSnapshotContext)}
+ */
+ void flushBuffer() {
+ for (KV<Integer, WindowedValue<?>> taggedElem : bufferState.read()) {
+ emit(idsToTags.get(taggedElem.getKey()), (WindowedValue) taggedElem.getValue());
+ }
+ bufferState.clear();
+ }
+
+ private <T> void emit(TupleTag<T> tag, WindowedValue<T> value) {
+ if (tag.equals(mainTag)) {
+ // with tagged outputs we can't get around this because we don't
+ // know our own output type...
+ @SuppressWarnings("unchecked")
+ WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
+ output.collect(new StreamRecord<>(castValue));
+ } else {
+ @SuppressWarnings("unchecked")
+ OutputTag<WindowedValue<T>> outputTag = (OutputTag) tagsToOutputTags.get(tag);
+ output.collect(outputTag, new StreamRecord<>(value));
+ }
+ }
+ }
+
+ /**
+ * Coder for KV of id and value. It will be serialized in Flink checkpoint.
+ */
+ private static class TaggedKvCoder extends StructuredCoder<KV<Integer, WindowedValue<?>>> {
+
+ private Map<Integer, Coder<WindowedValue<?>>> idsToCoders;
+
+ TaggedKvCoder(Map<Integer, Coder<WindowedValue<?>>> idsToCoders) {
+ this.idsToCoders = idsToCoders;
+ }
+
+ @Override
+ public void encode(KV<Integer, WindowedValue<?>> kv, OutputStream out)
+ throws IOException {
+ Coder<WindowedValue<?>> coder = idsToCoders.get(kv.getKey());
+ VarIntCoder.of().encode(kv.getKey(), out);
+ coder.encode(kv.getValue(), out);
+ }
+
+ @Override
+ public KV<Integer, WindowedValue<?>> decode(InputStream in)
+ throws IOException {
+ Integer id = VarIntCoder.of().decode(in);
+ Coder<WindowedValue<?>> coder = idsToCoders.get(id);
+ WindowedValue<?> value = coder.decode(in);
+ return KV.<Integer, WindowedValue<?>>of(id, value);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return new ArrayList<>(idsToCoders.values());
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ for (Coder<?> coder : idsToCoders.values()) {
+ verifyDeterministic(this, "Coder must be deterministic", coder);
+ }
}
}
/**
* Implementation of {@link OutputManagerFactory} that creates an
- * {@link DoFnRunners.OutputManager} that can write to multiple logical
- * outputs by unioning them in a {@link RawUnionValue}.
+ * {@link BufferedOutputManager} that can write to multiple logical
+ * outputs by Flink side output.
*/
public static class MultiOutputOutputManagerFactory<OutputT>
implements OutputManagerFactory<OutputT> {
- private TupleTag<?> mainTag;
- Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping;
+ private TupleTag<OutputT> mainTag;
+ private Map<TupleTag<?>, Integer> tagsToIds;
+ private Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags;
+ private Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders;
+
+ // There is no side output.
+ @SuppressWarnings("unchecked")
+ public MultiOutputOutputManagerFactory(
+ TupleTag<OutputT> mainTag, Coder<WindowedValue<OutputT>> mainCoder) {
+ this(mainTag,
+ new HashMap<TupleTag<?>, OutputTag<WindowedValue<?>>>(),
+ ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+ .put(mainTag, (Coder) mainCoder).build(),
+ ImmutableMap.<TupleTag<?>, Integer>builder()
+ .put(mainTag, 0).build());
+ }
public MultiOutputOutputManagerFactory(
- TupleTag<?> mainTag,
- Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping) {
+ TupleTag<OutputT> mainTag,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
+ Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders,
+ Map<TupleTag<?>, Integer> tagsToIds) {
this.mainTag = mainTag;
- this.mapping = mapping;
+ this.tagsToOutputTags = tagsToOutputTags;
+ this.tagsToCoders = tagsToCoders;
+ this.tagsToIds = tagsToIds;
}
@Override
- public DoFnRunners.OutputManager create(
- final Output<StreamRecord<WindowedValue<OutputT>>> output) {
- return new DoFnRunners.OutputManager() {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- if (tag.equals(mainTag)) {
- @SuppressWarnings("unchecked")
- WindowedValue<OutputT> outputValue = (WindowedValue<OutputT>) value;
- output.collect(new StreamRecord<>(outputValue));
- } else {
- @SuppressWarnings("unchecked")
- OutputTag<WindowedValue<T>> outputTag = (OutputTag) mapping.get(tag);
- output.<WindowedValue<T>>collect(outputTag, new StreamRecord<>(value));
- }
- }
- };
+ public BufferedOutputManager<OutputT> create(
+ Output<StreamRecord<WindowedValue<OutputT>>> output,
+ StateInternals stateInternals) {
+ return new BufferedOutputManager<>(
+ output, mainTag, tagsToOutputTags, tagsToCoders, tagsToIds, stateInternals);
}
}
@@ -742,11 +958,11 @@ public class DoFnOperator<InputT, OutputT>
* {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow
* accessing state or timer internals.
*/
- protected class StepContext implements org.apache.beam.runners.core.StepContext {
+ protected class FlinkStepContext implements StepContext {
@Override
public StateInternals stateInternals() {
- return stateInternals;
+ return keyedStateInternals;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index be758a6..b255bb4 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -97,7 +97,7 @@ public class SplittableDoFnOperator<
public StateInternals stateInternalsForKey(String key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals) stateInternals;
+ return (StateInternals) keyedStateInternals;
}
};
TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
@@ -148,7 +148,7 @@ public class SplittableDoFnOperator<
public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<String, KV<InputT, RestrictionT>>timersWorkItem(
- (String) stateInternals.getKey(),
+ (String) keyedStateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 78d585e..b1fb398 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -86,7 +86,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public StateInternals stateInternalsForKey(K key) {
//this will implicitly be keyed by the key of the incoming
// element or by the key of a firing timer
- return (StateInternals) stateInternals;
+ return (StateInternals) keyedStateInternals;
}
};
TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
@@ -112,7 +112,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public void fireTimer(InternalTimer<?, TimerData> timer) {
doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
KeyedWorkItems.<K, InputT>timersWorkItem(
- (K) stateInternals.getKey(),
+ (K) keyedStateInternals.getKey(),
Collections.singletonList(timer.getNamespace()))));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
index bb2a9ff..09e59fd 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -167,7 +167,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
@Override
public void add(T input) {
try {
- flinkStateBackend.getOperatorState(descriptor).add(input);
+ flinkStateBackend.getListState(descriptor).add(input);
} catch (Exception e) {
throw new RuntimeException("Error updating state.", e);
}
@@ -181,7 +181,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
@Override
public Iterable<T> read() {
try {
- Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+ Iterable<T> result = flinkStateBackend.getListState(descriptor).get();
return result != null ? result : Collections.<T>emptyList();
} catch (Exception e) {
throw new RuntimeException("Error updating state.", e);
@@ -194,7 +194,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
@Override
public Boolean read() {
try {
- Iterable<T> result = flinkStateBackend.getOperatorState(descriptor).get();
+ Iterable<T> result = flinkStateBackend.getListState(descriptor).get();
// PartitionableListState.get() return empty collection When there is no element,
// KeyedListState different. (return null)
return result == null || Iterators.size(result.iterator()) == 0;
@@ -214,7 +214,7 @@ public class FlinkSplitStateInternals<K> implements StateInternals {
@Override
public void clear() {
try {
- flinkStateBackend.getOperatorState(descriptor).clear();
+ flinkStateBackend.getListState(descriptor).clear();
} catch (Exception e) {
throw new RuntimeException("Error reading state.", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index eb06026..57086df 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
import java.util.Collections;
import java.util.HashMap;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -60,13 +61,15 @@ public class PipelineOptionsTest {
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
- new DoFnOperator<>(
+ TupleTag<String> mainTag = new TupleTag<>("main-output");
+ Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
"stepName",
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
- new TupleTag<String>("main-output"),
+ coder,
+ mainTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(),
Collections.<PCollectionView<?>>emptyList(),
@@ -81,13 +84,16 @@ public class PipelineOptionsTest {
@Test
public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
+ TupleTag<String> mainTag = new TupleTag<>("main-output");
+
+ Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new TestDoFn(),
"stepName",
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
- new TupleTag<String>("main-output"),
+ coder,
+ mainTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(),
Collections.<PCollectionView<?>>emptyList(),
@@ -105,7 +111,6 @@ public class PipelineOptionsTest {
OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness =
new OneInputStreamOperatorTestHarness<>(deserialized,
typeInformation.createSerializer(new ExecutionConfig()));
-
testHarness.open();
// execute once to access options
http://git-wip-us.apache.org/repos/asf/beam/blob/ceec7ce5/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 4d2a912..ad17de8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -107,18 +109,17 @@ public class DoFnOperatorTest {
@SuppressWarnings("unchecked")
public void testSingleOutput() throws Exception {
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+ Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
TupleTag<String> outputTag = new TupleTag<>("main-output");
DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
"stepName",
- windowedValueCoder,
+ coder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -143,26 +144,38 @@ public class DoFnOperatorTest {
@SuppressWarnings("unchecked")
public void testMultiOutputOutput() throws Exception {
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+ WindowedValue.ValueOnlyWindowedValueCoder<String> coder =
WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
TupleTag<String> mainOutput = new TupleTag<>("main-output");
TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
- ImmutableMap<TupleTag<?>, OutputTag<?>> outputMapping =
+ ImmutableMap<TupleTag<?>, OutputTag<?>> tagsToOutputTags =
ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
- .put(mainOutput, new OutputTag<String>(mainOutput.getId()){})
.put(additionalOutput1, new OutputTag<String>(additionalOutput1.getId()){})
.put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
.build();
+ ImmutableMap<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders =
+ ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+ .put(mainOutput, (Coder) coder)
+ .put(additionalOutput1, coder)
+ .put(additionalOutput2, coder)
+ .build();
+ ImmutableMap<TupleTag<?>, Integer> tagsToIds =
+ ImmutableMap.<TupleTag<?>, Integer>builder()
+ .put(mainOutput, 0)
+ .put(additionalOutput1, 1)
+ .put(additionalOutput2, 2)
+ .build();
DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new MultiOutputDoFn(additionalOutput1, additionalOutput2),
"stepName",
- windowedValueCoder,
+ coder,
mainOutput,
ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
- new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping),
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -184,13 +197,13 @@ public class DoFnOperatorTest {
WindowedValue.valueInGlobalWindow("got: hello")));
assertThat(
- this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))),
+ this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1))),
contains(
WindowedValue.valueInGlobalWindow("extra: one"),
WindowedValue.valueInGlobalWindow("got: hello")));
assertThat(
- this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))),
+ this.stripStreamRecord(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput2))),
contains(
WindowedValue.valueInGlobalWindow("extra: two"),
WindowedValue.valueInGlobalWindow("got: hello")));
@@ -255,7 +268,7 @@ public class DoFnOperatorTest {
inputCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -329,20 +342,20 @@ public class DoFnOperatorTest {
}
};
- WindowedValue.FullWindowedValueCoder<Integer> windowedValueCoder =
- WindowedValue.getFullCoder(
- VarIntCoder.of(),
- windowingStrategy.getWindowFn().windowCoder());
+ Coder<WindowedValue<Integer>> inputCoder = WindowedValue.getFullCoder(
+ VarIntCoder.of(), windowingStrategy.getWindowFn().windowCoder());
+ Coder<WindowedValue<String>> outputCoder = WindowedValue.getFullCoder(
+ StringUtf8Coder.of(), windowingStrategy.getWindowFn().windowCoder());
TupleTag<String> outputTag = new TupleTag<>("main-output");
DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
fn,
"stepName",
- windowedValueCoder,
+ inputCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -441,7 +454,7 @@ public class DoFnOperatorTest {
}
};
- WindowedValue.FullWindowedValueCoder<KV<String, Integer>> windowedValueCoder =
+ WindowedValue.FullWindowedValueCoder<KV<String, Integer>> coder =
WindowedValue.getFullCoder(
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
windowingStrategy.getWindowFn().windowCoder());
@@ -452,10 +465,10 @@ public class DoFnOperatorTest {
new DoFnOperator<>(
fn,
"stepName",
- windowedValueCoder,
+ coder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<KV<String, Integer>>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -531,8 +544,7 @@ public class DoFnOperatorTest {
public void testSideInputs(boolean keyed) throws Exception {
- WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
- WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+ Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
TupleTag<String> outputTag = new TupleTag<>("main-output");
@@ -550,10 +562,10 @@ public class DoFnOperatorTest {
DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
new IdentityDoFn<String>(),
"stepName",
- windowedValueCoder,
+ coder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<String>(),
+ new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, coder),
WindowingStrategy.globalDefault(),
sideInputMapping, /* side-input mapping */
ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
@@ -631,6 +643,105 @@ public class DoFnOperatorTest {
testSideInputs(true);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBundle() throws Exception {
+
+ WindowedValue.ValueOnlyWindowedValueCoder<String> windowedValueCoder =
+ WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
+
+ TupleTag<String> outputTag = new TupleTag<>("main-output");
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setMaxBundleSize(2L);
+ options.setMaxBundleTimeMills(10L);
+
+ IdentityDoFn<String> doFn = new IdentityDoFn<String>() {
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context) {
+ context.output(
+ "finishBundle", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
+ }
+ };
+
+ DoFnOperator.MultiOutputOutputManagerFactory<String> outputManagerFactory =
+ new DoFnOperator.MultiOutputOutputManagerFactory(
+ outputTag,
+ WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE));
+
+ DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+ doFn,
+ "stepName",
+ windowedValueCoder,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ outputManagerFactory,
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ options,
+ null);
+
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
+ new OneInputStreamOperatorTestHarness<>(doFnOperator);
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("a")));
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("b")));
+ testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("c")));
+
+ // draw a snapshot
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ // There is a finishBundle in snapshot()
+ // Elements will be buffered as part of finishing a bundle in snapshot()
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+ contains(
+ WindowedValue.valueInGlobalWindow("a"),
+ WindowedValue.valueInGlobalWindow("b"),
+ WindowedValue.valueInGlobalWindow("finishBundle"),
+ WindowedValue.valueInGlobalWindow("c")));
+
+ testHarness.close();
+
+ DoFnOperator<String, String> newDoFnOperator = new DoFnOperator<>(
+ doFn,
+ "stepName",
+ windowedValueCoder,
+ outputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ outputManagerFactory,
+ WindowingStrategy.globalDefault(),
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ options,
+ null);
+
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> newHarness =
+ new OneInputStreamOperatorTestHarness<>(newDoFnOperator);
+
+ // restore snapshot
+ newHarness.initializeState(snapshot);
+
+ newHarness.open();
+
+ // startBundle will output the buffered elements.
+ newHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("d")));
+
+ // check finishBundle by timeout
+ newHarness.setProcessingTime(10);
+
+ assertThat(
+ this.<String>stripStreamRecordFromWindowedValue(newHarness.getOutput()),
+ contains(
+ WindowedValue.valueInGlobalWindow("finishBundle"),
+ WindowedValue.valueInGlobalWindow("d"),
+ WindowedValue.valueInGlobalWindow("finishBundle")));
+
+ newHarness.close();
+ }
+
private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
Iterable<Object> input) {
[2/2] beam git commit: This closes #3368
Posted by pe...@apache.org.
This closes #3368
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/724eda37
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/724eda37
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/724eda37
Branch: refs/heads/master
Commit: 724eda37ea1e54aac089d89c711ca3cee14a4603
Parents: 3a8b0b6 ceec7ce
Author: Pei He <pe...@apache.org>
Authored: Wed Aug 16 11:46:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Wed Aug 16 11:46:49 2017 +0800
----------------------------------------------------------------------
.../runners/flink/FlinkPipelineOptions.java | 11 +
.../FlinkStreamingTransformTranslators.java | 77 ++--
.../wrappers/streaming/DoFnOperator.java | 412 ++++++++++++++-----
.../streaming/SplittableDoFnOperator.java | 4 +-
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../state/FlinkSplitStateInternals.java | 8 +-
.../beam/runners/flink/PipelineOptionsTest.java | 21 +-
.../flink/streaming/DoFnOperatorTest.java | 161 ++++++--
8 files changed, 535 insertions(+), 163 deletions(-)
----------------------------------------------------------------------