You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/01 08:12:18 UTC
[1/2] beam git commit: [BEAM-1855] Support Splittable DoFn on Flink
Runner
Repository: beam
Updated Branches:
refs/heads/master e31ca8b0d -> ea33e3373
[BEAM-1855] Support Splittable DoFn on Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5824bb4b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5824bb4b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5824bb4b
Branch: refs/heads/master
Commit: 5824bb4b5700b5230f569c570d5e8ed2d11cedf2
Parents: e31ca8b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 13 21:23:14 2017 +0100
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 1 01:10:55 2017 -0700
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 4 +-
.../flink/FlinkStreamingPipelineTranslator.java | 37 ++
.../FlinkStreamingTransformTranslators.java | 341 ++++++++++++++-----
.../streaming/SplittableDoFnOperator.java | 150 ++++++++
.../beam/sdk/transforms/SplittableDoFnTest.java | 3 +-
5 files changed, 448 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 3b35c8e..f26aeb0 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -88,9 +88,9 @@
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesMapState,
- org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
- org.apache.beam.sdk.testing.UsesCommittedMetrics
+ org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.UsesTestStream
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index d50d6bf..0cedf66 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -18,8 +18,12 @@
package org.apache.beam.runners.flink;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -29,9 +33,13 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +74,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides =
ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
.put(
+ PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory())
+ .put(
PTransformMatchers.classEqualTo(View.AsIterable.class),
new ReflectiveOneToOneOverrideFactory(
FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))
@@ -228,4 +238,31 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
}
}
+ /**
+ * A {@link PTransformOverrideFactory} that overrides a
+ * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with
+ * {@link SplittableParDo}.
+ */
+ static class SplittableParDoOverrideFactory<InputT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<? extends InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
+ ParDo.MultiOutput<InputT, OutputT> transform) {
+ return new SplittableParDo(transform);
+ }
+
+ @Override
+ public PCollection<? extends InputT> getInput(
+ List<TaggedPValue> inputs, Pipeline p) {
+ return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollectionTuple newOutput) {
+ return ReplacementOutputs.tagged(outputs, newOutput);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 00b0412..5c29db2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -29,6 +29,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
@@ -37,6 +40,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
@@ -45,6 +49,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Sink;
@@ -60,6 +65,7 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -122,6 +128,11 @@ class FlinkStreamingTransformTranslators {
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
+ TRANSLATORS.put(
+ SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator());
+ TRANSLATORS.put(
+ SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
+
TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
@@ -309,16 +320,6 @@ class FlinkStreamingTransformTranslators {
}
}
- private static void rejectSplittable(DoFn<?, ?> doFn) {
- DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
- if (signature.processElement().isSplittable()) {
- throw new UnsupportedOperationException(
- String.format(
- "%s does not currently support splittable DoFn: %s",
- FlinkRunner.class.getSimpleName(), doFn));
- }
- }
-
/**
* Wraps each element in a {@link RawUnionValue} with the given tag id.
*/
@@ -395,64 +396,77 @@ class FlinkStreamingTransformTranslators {
return new Tuple2<>(intToViewMapping, sideInputUnion);
}
+ /**
+ * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}.
+ */
+ static class ParDoTranslationHelper {
+
+ interface DoFnOperatorFactory<InputT, OutputT> {
+ DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ DoFn<InputT, OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<WindowedValue<InputT>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs);
+ }
- private static class ParDoStreamingTranslator<InputT, OutputT>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
- ParDo.MultiOutput<InputT, OutputT>> {
-
- @Override
- public void translateNode(
- ParDo.MultiOutput<InputT, OutputT> transform,
- FlinkStreamingTranslationContext context) {
-
- DoFn<InputT, OutputT> doFn = transform.getFn();
- rejectSplittable(doFn);
+ static <InputT, OutputT> void translateParDo(
+ String transformName,
+ DoFn<InputT, OutputT> doFn,
+ PCollection<InputT> input,
+ List<PCollectionView<?>> sideInputs,
+ List<TaggedPValue> outputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ FlinkStreamingTranslationContext context,
+ DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
// we assume that the transformation does not change the windowing strategy.
- WindowingStrategy<?, ?> windowingStrategy =
- context.getInput(transform).getWindowingStrategy();
-
- List<TaggedPValue> outputs = context.getOutputs(transform);
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
Map<TupleTag<?>, Integer> tagsToLabels =
- transformTupleTagsToLabels(transform.getMainOutputTag(), outputs);
-
- List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ transformTupleTagsToLabels(mainOutputTag, outputs);
SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
- @SuppressWarnings("unchecked")
- PCollection<InputT> inputPCollection = (PCollection<InputT>) context.getInput(transform);
+ Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
- Coder<WindowedValue<InputT>> inputCoder = context.getCoder(inputPCollection);
+ DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
- DataStream<WindowedValue<InputT>> inputDataStream =
- context.getInputDataStream(context.getInput(transform));
Coder keyCoder = null;
boolean stateful = false;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.stateDeclarations().size() > 0
|| signature.timerDeclarations().size() > 0) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
- keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+ keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
stateful = true;
+ } else if (doFn instanceof SplittableParDo.ProcessFn) {
+ // we know that it is keyed on String
+ keyCoder = StringUtf8Coder.of();
+ stateful = true;
}
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
- new DoFnOperator<>(
- transform.getFn(),
- inputCoder,
- transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ doFnOperatorFactory.createDoFnOperator(
+ doFn,
+ sideInputs,
+ mainOutputTag,
+ sideOutputTags,
+ context,
windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- context.getPipelineOptions(),
- keyCoder);
+ tagsToLabels,
+ inputCoder,
+ keyCoder,
+ new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
UnionCoder outputUnionCoder = createUnionCoder(outputs);
@@ -460,24 +474,24 @@ class FlinkStreamingTransformTranslators {
new CoderTypeInformation<>(outputUnionCoder);
unionOutputStream = inputDataStream
- .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+ .transform(transformName, outputUnionTypeInformation, doFnOperator);
} else {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
transformSideInputs(sideInputs, context);
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
- new DoFnOperator<>(
- transform.getFn(),
- inputCoder,
- transform.getMainOutputTag(),
- transform.getSideOutputTags().getAll(),
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
- windowingStrategy,
- transformedSideInputs.f0,
+ doFnOperatorFactory.createDoFnOperator(
+ doFn,
sideInputs,
- context.getPipelineOptions(),
- keyCoder);
+ mainOutputTag,
+ sideOutputTags,
+ context,
+ windowingStrategy,
+ tagsToLabels,
+ inputCoder,
+ keyCoder,
+ transformedSideInputs.f0);
UnionCoder outputUnionCoder = createUnionCoder(outputs);
@@ -494,7 +508,7 @@ class FlinkStreamingTransformTranslators {
WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
keyedStream.getTransformation(),
transformedSideInputs.f1.broadcast().getTransformation(),
- transform.getName(),
+ transformName,
(TwoInputStreamOperator) doFnOperator,
outputUnionTypeInformation,
keyedStream.getParallelism());
@@ -511,7 +525,7 @@ class FlinkStreamingTransformTranslators {
} else {
unionOutputStream = inputDataStream
.connect(transformedSideInputs.f1.broadcast())
- .transform(transform.getName(), outputUnionTypeInformation, doFnOperator);
+ .transform(transformName, outputUnionTypeInformation, doFnOperator);
}
}
@@ -541,7 +555,7 @@ class FlinkStreamingTransformTranslators {
}
}
- private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+ private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
TupleTag<?> mainTag,
List<TaggedPValue> allTaggedValues) {
@@ -556,7 +570,7 @@ class FlinkStreamingTransformTranslators {
return tagToLabelMap;
}
- private UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) {
+ private static UnionCoder createUnionCoder(Collection<TaggedPValue> taggedCollections) {
List<Coder<?>> outputCoders = Lists.newArrayList();
for (TaggedPValue taggedColl : taggedCollections) {
checkArgument(
@@ -575,6 +589,112 @@ class FlinkStreamingTransformTranslators {
}
}
+ private static class ParDoStreamingTranslator<InputT, OutputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ ParDoTranslationHelper.translateParDo(
+ transform.getName(),
+ transform.getFn(),
+ (PCollection<InputT>) context.getInput(transform),
+ transform.getSideInputs(),
+ context.getOutputs(transform),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ context,
+ new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
+ @Override
+ public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ DoFn<InputT, OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<WindowedValue<InputT>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs) {
+ return new DoFnOperator<>(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ sideOutputTags,
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ transformedSideInputs,
+ sideInputs,
+ context.getPipelineOptions(),
+ keyCoder);
+ }
+ });
+ }
+ }
+
+ private static class SplittableProcessElementsStreamingTranslator<
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
+
+ @Override
+ public void translateNode(
+ SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ ParDoTranslationHelper.translateParDo(
+ transform.getName(),
+ transform.newProcessFn(transform.getFn()),
+ (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
+ context.getInput(transform),
+ transform.getSideInputs(),
+ context.getOutputs(transform),
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ context,
+ new ParDoTranslationHelper.DoFnOperatorFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+ @Override
+ public DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT,
+ RawUnionValue> createDoFnOperator(
+ DoFn<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<
+ String,
+ ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs) {
+ return new SplittableDoFnOperator<>(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ sideOutputTags,
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ transformedSideInputs,
+ sideInputs,
+ context.getPipelineOptions(),
+ keyCoder);
+ }
+ });
+ }
+ }
+
private static class CreateViewStreamingTranslator<ElemT, ViewT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
@@ -677,7 +797,7 @@ class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
inputDataStream
- .flatMap(new CombinePerKeyTranslator.ToKeyedWorkItem<K, InputT>())
+ .flatMap(new ToKeyedWorkItem<K, InputT>())
.returns(workItemTypeInfo).name("ToKeyedWorkItem");
KeyedStream<
@@ -861,30 +981,56 @@ class FlinkStreamingTransformTranslators {
context.setOutputDataStream(context.getOutput(transform), outDataStream);
}
}
+ }
- private static class ToKeyedWorkItem<K, InputT>
- extends RichFlatMapFunction<
- WindowedValue<KV<K, InputT>>,
- WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
-
- @Override
- public void flatMap(
- WindowedValue<KV<K, InputT>> inWithMultipleWindows,
- Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
-
- // we need to wrap each one work item per window for now
- // since otherwise the PushbackSideInputRunner will not correctly
- // determine whether side inputs are ready
- for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
- SingletonKeyedWorkItem<K, InputT> workItem =
- new SingletonKeyedWorkItem<>(
- in.getValue().getKey(),
- in.withValue(in.getValue().getValue()));
-
- in.withValue(workItem);
- out.collect(in.withValue(workItem));
- }
- }
+ private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+ @Override
+ boolean canTranslate(
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+ return true;
+ }
+
+ @Override
+ public void translateNode(
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+ SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+ inputKvCoder.getKeyCoder(),
+ inputKvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+ new CoderTypeInformation<>(windowedWorkItemCoder);
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+ DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+ inputDataStream
+ .flatMap(new ToKeyedWorkItem<K, InputT>())
+ .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+ KeyedStream<
+ WindowedValue<
+ SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+ .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+ context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
}
}
@@ -931,4 +1077,31 @@ class FlinkStreamingTransformTranslators {
}
}
}
+
+ private static class ToKeyedWorkItem<K, InputT>
+ extends RichFlatMapFunction<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+ @Override
+ public void flatMap(
+ WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+ Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
+
+ // we need to wrap each one work item per window for now
+ // since otherwise the PushbackSideInputRunner will not correctly
+ // determine whether side inputs are ready
+ //
+ // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+ for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
+ SingletonKeyedWorkItem<K, InputT> workItem =
+ new SingletonKeyedWorkItem<>(
+ in.getValue().getKey(),
+ in.withValue(in.getValue().getValue()));
+
+ out.collect(in.withValue(workItem));
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
new file mode 100644
index 0000000..0724ac2
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
+ * the {@code @ProcessElement} method of a splittable {@link DoFn}.
+ */
+public class SplittableDoFnOperator<
+ InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
+
+ public SplittableDoFnOperator(
+ DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+ TupleTag<FnOutputT> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ OutputManagerFactory<OutputT> outputManagerFactory,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<Integer, PCollectionView<?>> sideInputTagMapping,
+ Collection<PCollectionView<?>> sideInputs,
+ PipelineOptions options,
+ Coder<?> keyCoder) {
+ super(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ sideOutputTags,
+ outputManagerFactory,
+ windowingStrategy,
+ sideInputTagMapping,
+ sideInputs,
+ options,
+ keyCoder);
+
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ checkState(doFn instanceof SplittableParDo.ProcessFn);
+
+ StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
+ @Override
+ public StateInternals<String> stateInternalsForKey(String key) {
+ //this will implicitly be keyed by the key of the incoming
+ // element or by the key of a firing timer
+ return (StateInternals<String>) stateInternals;
+ }
+ };
+ TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
+ @Override
+ public TimerInternals timerInternalsForKey(String key) {
+ //this will implicitly be keyed like the StateInternalsFactory
+ return timerInternals;
+ }
+ };
+
+ ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
+ ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
+ ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
+ new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
+ doFn,
+ serializedOptions.getPipelineOptions(),
+ new OutputWindowedValue<FnOutputT>() {
+ @Override
+ public void outputWindowedValue(
+ FnOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(
+ mainOutputTag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
+ }
+ },
+ sideInputReader,
+ Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+ 10000,
+ Duration.standardSeconds(10)));
+ }
+
+ @Override
+ public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
+ pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
+ KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+ (String) stateInternals.getKey(),
+ Collections.singletonList(timer.getNamespace()))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5824bb4b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index acd5584..d926f6c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
@@ -292,7 +293,7 @@ public class SplittableDoFnTest {
}
@Test
- @Category({ValidatesRunner.class, UsesSplittableParDo.class})
+ @Category({ValidatesRunner.class, UsesSplittableParDo.class, UsesTestStream.class})
public void testLateData() throws Exception {
Instant base = Instant.now();
[2/2] beam git commit: This closes #2235
Posted by jk...@apache.org.
This closes #2235
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ea33e337
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ea33e337
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ea33e337
Branch: refs/heads/master
Commit: ea33e3373a88a3ea286ea3f49f7a2b3c12aefaeb
Parents: e31ca8b 5824bb4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 1 01:11:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Apr 1 01:11:53 2017 -0700
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 4 +-
.../flink/FlinkStreamingPipelineTranslator.java | 37 ++
.../FlinkStreamingTransformTranslators.java | 341 ++++++++++++++-----
.../streaming/SplittableDoFnOperator.java | 150 ++++++++
.../beam/sdk/transforms/SplittableDoFnTest.java | 3 +-
5 files changed, 448 insertions(+), 87 deletions(-)
----------------------------------------------------------------------