You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:34:58 UTC
[09/50] beam git commit: [BEAM-1498] Use Flink-native side outputs
[BEAM-1498] Use Flink-native side outputs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0601fd4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0601fd4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0601fd4
Branch: refs/heads/DSL_SQL
Commit: b0601fd43e0929e8b925dbe566e564460f91d9fc
Parents: 88f78fa
Author: JingsongLi <lz...@aliyun.com>
Authored: Sun Jun 4 21:56:10 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 6 14:33:36 2017 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 145 ++++++-------------
.../wrappers/streaming/DoFnOperator.java | 40 +++--
.../wrappers/streaming/WindowDoFnOperator.java | 4 +-
.../beam/runners/flink/PipelineOptionsTest.java | 5 +-
.../flink/streaming/DoFnOperatorTest.java | 65 +++++----
5 files changed, 112 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 00e9934..d8c3049 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -18,9 +18,6 @@
package org.apache.beam.runners.flink;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -29,7 +26,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -84,16 +80,15 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* This class contains all the mappings between Beam and Flink
@@ -337,7 +332,7 @@ class FlinkStreamingTransformTranslators {
static class ParDoTranslationHelper {
interface DoFnOperatorFactory<InputT, OutputT> {
- DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
@@ -345,7 +340,7 @@ class FlinkStreamingTransformTranslators {
List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToLabels,
Coder<WindowedValue<InputT>> inputCoder,
Coder keyCoder,
Map<Integer, PCollectionView<?>> transformedSideInputs);
@@ -354,7 +349,6 @@ class FlinkStreamingTransformTranslators {
static <InputT, OutputT> void translateParDo(
String transformName,
DoFn<InputT, OutputT> doFn,
- String stepName,
PCollection<InputT> input,
List<PCollectionView<?>> sideInputs,
Map<TupleTag<?>, PValue> outputs,
@@ -366,10 +360,15 @@ class FlinkStreamingTransformTranslators {
// we assume that the transformation does not change the windowing strategy.
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- Map<TupleTag<?>, Integer> tagsToLabels =
- transformTupleTagsToLabels(mainOutputTag, outputs);
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap();
+ for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+ if (!tagsToOutputTags.containsKey(entry.getKey())) {
+ tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(),
+ (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue())));
+ }
+ }
- SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+ SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream;
Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
@@ -391,8 +390,12 @@ class FlinkStreamingTransformTranslators {
stateful = true;
}
+ CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
+ new CoderTypeInformation<>(
+ context.getCoder((PCollection<OutputT>) outputs.get(mainOutputTag)));
+
if (sideInputs.isEmpty()) {
- DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
context.getCurrentTransform().getFullName(),
@@ -401,24 +404,19 @@ class FlinkStreamingTransformTranslators {
additionalOutputTags,
context,
windowingStrategy,
- tagsToLabels,
+ tagsToOutputTags,
inputCoder,
keyCoder,
new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
- UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
- CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
- new CoderTypeInformation<>(outputUnionCoder);
-
- unionOutputStream = inputDataStream
- .transform(transformName, outputUnionTypeInformation, doFnOperator);
+ outputStream = inputDataStream
+ .transform(transformName, outputTypeInformation, doFnOperator);
} else {
Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
transformSideInputs(sideInputs, context);
- DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ DoFnOperator<InputT, OutputT, OutputT> doFnOperator =
doFnOperatorFactory.createDoFnOperator(
doFn,
context.getCurrentTransform().getFullName(),
@@ -427,16 +425,11 @@ class FlinkStreamingTransformTranslators {
additionalOutputTags,
context,
windowingStrategy,
- tagsToLabels,
+ tagsToOutputTags,
inputCoder,
keyCoder,
transformedSideInputs.f0);
- UnionCoder outputUnionCoder = createUnionCoder(outputs);
-
- CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
- new CoderTypeInformation<>(outputUnionCoder);
-
if (stateful) {
// we have to manually contruct the two-input transform because we're not
// allowed to have only one input keyed, normally.
@@ -448,83 +441,35 @@ class FlinkStreamingTransformTranslators {
keyedStream.getTransformation(),
transformedSideInputs.f1.broadcast().getTransformation(),
transformName,
- (TwoInputStreamOperator) doFnOperator,
- outputUnionTypeInformation,
+ doFnOperator,
+ outputTypeInformation,
keyedStream.getParallelism());
rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
- unionOutputStream = new SingleOutputStreamOperator(
- keyedStream.getExecutionEnvironment(),
- rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+ outputStream = new SingleOutputStreamOperator(
+ keyedStream.getExecutionEnvironment(),
+ rawFlinkTransform) {
+ }; // we have to cheat around the ctor being protected
keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
} else {
- unionOutputStream = inputDataStream
+ outputStream = inputDataStream
.connect(transformedSideInputs.f1.broadcast())
- .transform(transformName, outputUnionTypeInformation, doFnOperator);
+ .transform(transformName, outputTypeInformation, doFnOperator);
}
}
- SplitStream<RawUnionValue> splitStream = unionOutputStream
- .split(new OutputSelector<RawUnionValue>() {
- @Override
- public Iterable<String> select(RawUnionValue value) {
- return Collections.singletonList(Integer.toString(value.getUnionTag()));
- }
- });
-
- for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
- final int outputTag = tagsToLabels.get(output.getKey());
-
- TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
-
- @SuppressWarnings("unchecked")
- DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
- .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
- @Override
- public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
- out.collect(value.getValue());
- }
- }).returns(outputTypeInfo);
-
- context.setOutputDataStream(output.getValue(), unwrapped);
- }
- }
-
- private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
- TupleTag<?> mainTag,
- Map<TupleTag<?>, PValue> allTaggedValues) {
+ context.setOutputDataStream(outputs.get(mainOutputTag), outputStream);
- Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
- int count = 0;
- tagToLabelMap.put(mainTag, count++);
- for (TupleTag<?> key : allTaggedValues.keySet()) {
- if (!tagToLabelMap.containsKey(key)) {
- tagToLabelMap.put(key, count++);
+ for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+ if (!entry.getKey().equals(mainOutputTag)) {
+ context.setOutputDataStream(entry.getValue(),
+ outputStream.getSideOutput(tagsToOutputTags.get(entry.getKey())));
}
}
- return tagToLabelMap;
- }
-
- private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PValue taggedColl : taggedCollections.values()) {
- checkArgument(
- taggedColl instanceof PCollection,
- "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
- PCollection.class.getSimpleName(),
- taggedColl.getClass().getSimpleName());
- PCollection<?> coll = (PCollection<?>) taggedColl;
- WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
- WindowedValue.getFullCoder(
- coll.getCoder(),
- coll.getWindowingStrategy().getWindowFn().windowCoder());
- outputCoders.add(windowedValueCoder);
- }
- return UnionCoder.of(outputCoders);
}
}
@@ -540,7 +485,6 @@ class FlinkStreamingTransformTranslators {
ParDoTranslationHelper.translateParDo(
transform.getName(),
transform.getFn(),
- context.getCurrentTransform().getFullName(),
(PCollection<InputT>) context.getInput(transform),
transform.getSideInputs(),
context.getOutputs(transform),
@@ -549,7 +493,7 @@ class FlinkStreamingTransformTranslators {
context,
new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
@Override
- public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ public DoFnOperator<InputT, OutputT, OutputT> createDoFnOperator(
DoFn<InputT, OutputT> doFn,
String stepName,
List<PCollectionView<?>> sideInputs,
@@ -557,7 +501,7 @@ class FlinkStreamingTransformTranslators {
List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
Coder<WindowedValue<InputT>> inputCoder,
Coder keyCoder,
Map<Integer, PCollectionView<?>> transformedSideInputs) {
@@ -567,7 +511,7 @@ class FlinkStreamingTransformTranslators {
inputCoder,
mainOutputTag,
additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
windowingStrategy,
transformedSideInputs,
sideInputs,
@@ -592,7 +536,6 @@ class FlinkStreamingTransformTranslators {
ParDoTranslationHelper.translateParDo(
transform.getName(),
transform.newProcessFn(transform.getFn()),
- context.getCurrentTransform().getFullName(),
context.getInput(transform),
transform.getSideInputs(),
context.getOutputs(transform),
@@ -604,8 +547,7 @@ class FlinkStreamingTransformTranslators {
@Override
public DoFnOperator<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
- OutputT,
- RawUnionValue> createDoFnOperator(
+ OutputT, OutputT> createDoFnOperator(
DoFn<
KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
OutputT> doFn,
@@ -615,7 +557,7 @@ class FlinkStreamingTransformTranslators {
List<TupleTag<?>> additionalOutputTags,
FlinkStreamingTranslationContext context,
WindowingStrategy<?, ?> windowingStrategy,
- Map<TupleTag<?>, Integer> tagsToLabels,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags,
Coder<
WindowedValue<
KeyedWorkItem<
@@ -629,7 +571,7 @@ class FlinkStreamingTransformTranslators {
inputCoder,
mainOutputTag,
additionalOutputTags,
- new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ new DoFnOperator.MultiOutputOutputManagerFactory(mainOutputTag, tagsToOutputTags),
windowingStrategy,
transformedSideInputs,
sideInputs,
@@ -756,8 +698,7 @@ class FlinkStreamingTransformTranslators {
TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));
- DoFnOperator.DefaultOutputManagerFactory<
- WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+ DoFnOperator.DefaultOutputManagerFactory<KV<K, Iterable<InputT>>> outputManagerFactory =
new DoFnOperator.DefaultOutputManagerFactory<>();
WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
@@ -868,7 +809,7 @@ class FlinkStreamingTransformTranslators {
(Coder) windowedWorkItemCoder,
new TupleTag<KV<K, OutputT>>("main output"),
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -894,7 +835,7 @@ class FlinkStreamingTransformTranslators {
(Coder) windowedWorkItemCoder,
new TupleTag<KV<K, OutputT>>("main output"),
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ new DoFnOperator.DefaultOutputManagerFactory<KV<K, OutputT>>(),
windowingStrategy,
transformSideInputs.f0,
sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 594fe0e..8c27ed9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -87,6 +87,7 @@ import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import org.joda.time.Instant;
/**
@@ -98,9 +99,9 @@ import org.joda.time.Instant;
* type when we have additional tagged outputs
*/
public class DoFnOperator<InputT, FnOutputT, OutputT>
- extends AbstractStreamOperator<OutputT>
- implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
- TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
+ extends AbstractStreamOperator<WindowedValue<OutputT>>
+ implements OneInputStreamOperator<WindowedValue<InputT>, WindowedValue<OutputT>>,
+ TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, WindowedValue<OutputT>>,
KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
protected DoFn<InputT, FnOutputT> doFn;
@@ -662,7 +663,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
* a Flink {@link Output}.
*/
interface OutputManagerFactory<OutputT> extends Serializable {
- DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
+ DoFnRunners.OutputManager create(Output<StreamRecord<WindowedValue<OutputT>>> output);
}
/**
@@ -673,14 +674,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
public static class DefaultOutputManagerFactory<OutputT>
implements OutputManagerFactory<OutputT> {
@Override
- public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
+ public DoFnRunners.OutputManager create(
+ final Output<StreamRecord<WindowedValue<OutputT>>> output) {
return new DoFnRunners.OutputManager() {
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
// with tagged outputs we can't get around this because we don't
// know our own output type...
@SuppressWarnings("unchecked")
- OutputT castValue = (OutputT) value;
+ WindowedValue<OutputT> castValue = (WindowedValue<OutputT>) value;
output.collect(new StreamRecord<>(castValue));
}
};
@@ -692,22 +694,34 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
* {@link DoFnRunners.OutputManager} that can write to multiple logical
* outputs by unioning them in a {@link RawUnionValue}.
*/
- public static class MultiOutputOutputManagerFactory
- implements OutputManagerFactory<RawUnionValue> {
+ public static class MultiOutputOutputManagerFactory<OutputT>
+ implements OutputManagerFactory<OutputT> {
- Map<TupleTag<?>, Integer> mapping;
+ private TupleTag<?> mainTag;
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping;
- public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> mapping) {
+ public MultiOutputOutputManagerFactory(
+ TupleTag<?> mainTag,
+ Map<TupleTag<?>, OutputTag<WindowedValue<?>>> mapping) {
+ this.mainTag = mainTag;
this.mapping = mapping;
}
@Override
- public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
+ public DoFnRunners.OutputManager create(
+ final Output<StreamRecord<WindowedValue<OutputT>>> output) {
return new DoFnRunners.OutputManager() {
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> value) {
- int intTag = mapping.get(tag);
- output.collect(new StreamRecord<>(new RawUnionValue(intTag, value)));
+ if (tag.equals(mainTag)) {
+ @SuppressWarnings("unchecked")
+ WindowedValue<OutputT> outputValue = (WindowedValue<OutputT>) value;
+ output.collect(new StreamRecord<>(outputValue));
+ } else {
+ @SuppressWarnings("unchecked")
+ OutputTag<WindowedValue<T>> outputTag = (OutputTag) mapping.get(tag);
+ output.<WindowedValue<T>>collect(outputTag, new StreamRecord<>(value));
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index bf64ede..ea578b9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -46,7 +46,7 @@ import org.apache.flink.streaming.api.operators.InternalTimer;
* Flink operator for executing window {@link DoFn DoFns}.
*/
public class WindowDoFnOperator<K, InputT, OutputT>
- extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
+ extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, KV<K, OutputT>> {
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
@@ -56,7 +56,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
TupleTag<KV<K, OutputT>> mainOutputTag,
List<TupleTag<?>> additionalOutputTags,
- OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
+ OutputManagerFactory<KV<K, OutputT>> outputManagerFactory,
WindowingStrategy<?, ?> windowingStrategy,
Map<Integer, PCollectionView<?>> sideInputTagMapping,
Collection<PCollectionView<?>> sideInputs,
http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 8382a2d..bc0b1c2 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -173,13 +173,12 @@ public class PipelineOptionsTest {
final byte[] serialized = SerializationUtils.serialize(doFnOperator);
@SuppressWarnings("unchecked")
- DoFnOperator<Object, Object, Object> deserialized =
- (DoFnOperator<Object, Object, Object>) SerializationUtils.deserialize(serialized);
+ DoFnOperator<Object, Object, Object> deserialized = SerializationUtils.deserialize(serialized);
TypeInformation<WindowedValue<Object>> typeInformation = TypeInformation.of(
new TypeHint<WindowedValue<Object>>() {});
- OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> testHarness =
+ OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness =
new OneInputStreamOperatorTestHarness<>(deserialized,
typeInformation.createSerializer(new ExecutionConfig()));
http://git-wip-us.apache.org/repos/asf/beam/blob/b0601fd4/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 79bc0e0..132242e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
@@ -123,7 +124,7 @@ public class DoFnOperatorTest {
PipelineOptionsFactory.as(FlinkPipelineOptions.class),
null);
- OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness =
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);
testHarness.open();
@@ -147,26 +148,27 @@ public class DoFnOperatorTest {
TupleTag<String> mainOutput = new TupleTag<>("main-output");
TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
- ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder()
- .put(mainOutput, 1)
- .put(additionalOutput1, 2)
- .put(additionalOutput2, 3)
- .build();
+ ImmutableMap<TupleTag<?>, OutputTag<?>> outputMapping =
+ ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
+ .put(mainOutput, new OutputTag<String>(mainOutput.getId()){})
+ .put(additionalOutput1, new OutputTag<String>(additionalOutput1.getId()){})
+ .put(additionalOutput2, new OutputTag<String>(additionalOutput2.getId()){})
+ .build();
- DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
new MultiOutputDoFn(additionalOutput1, additionalOutput2),
"stepName",
windowedValueCoder,
mainOutput,
ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2),
- new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping),
+ new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput, outputMapping),
WindowingStrategy.globalDefault(),
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
PipelineOptionsFactory.as(FlinkPipelineOptions.class),
null);
- OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness =
+ OneInputStreamOperatorTestHarness<WindowedValue<String>, WindowedValue<String>> testHarness =
new OneInputStreamOperatorTestHarness<>(doFnOperator);
testHarness.open();
@@ -176,17 +178,26 @@ public class DoFnOperatorTest {
testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("hello")));
assertThat(
- this.stripStreamRecordFromRawUnion(testHarness.getOutput()),
+ this.stripStreamRecord(testHarness.getOutput()),
+ contains(
+ WindowedValue.valueInGlobalWindow("got: hello")));
+
+ assertThat(
+ this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput1))),
contains(
- new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")),
- new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")),
- new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")),
- new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")),
- new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello"))));
+ WindowedValue.valueInGlobalWindow("extra: one"),
+ WindowedValue.valueInGlobalWindow("got: hello")));
+
+ assertThat(
+ this.stripStreamRecord(testHarness.getSideOutput(outputMapping.get(additionalOutput2))),
+ contains(
+ WindowedValue.valueInGlobalWindow("extra: two"),
+ WindowedValue.valueInGlobalWindow("got: hello")));
testHarness.close();
}
+
@Test
public void testLateDroppingForStatefulFn() throws Exception {
@@ -212,13 +223,13 @@ public class DoFnOperatorTest {
TupleTag<String> outputTag = new TupleTag<>("main-output");
- DoFnOperator<Integer, String, WindowedValue<String>> doFnOperator = new DoFnOperator<>(
+ DoFnOperator<Integer, String, String> doFnOperator = new DoFnOperator<>(
fn,
"stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<String>>(),
+ new DoFnOperator.DefaultOutputManagerFactory<String>(),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -325,14 +336,14 @@ public class DoFnOperatorTest {
TupleTag<KV<String, Integer>> outputTag = new TupleTag<>("main-output");
DoFnOperator<
- KV<String, Integer>, KV<String, Integer>, WindowedValue<KV<String, Integer>>> doFnOperator =
+ KV<String, Integer>, KV<String, Integer>, KV<String, Integer>> doFnOperator =
new DoFnOperator<>(
fn,
"stepName",
windowedValueCoder,
outputTag,
Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<String, Integer>>>(),
+ new DoFnOperator.DefaultOutputManagerFactory<KV<String, Integer>>(),
windowingStrategy,
new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
Collections.<PCollectionView<?>>emptyList(), /* side inputs */
@@ -435,8 +446,8 @@ public class DoFnOperatorTest {
PipelineOptionsFactory.as(FlinkPipelineOptions.class),
keyCoder);
- TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness =
- new TwoInputStreamOperatorTestHarness<>(doFnOperator);
+ TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, WindowedValue<String>>
+ testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator);
if (keyed) {
// we use a dummy key for the second input since it is considered to be broadcast
@@ -527,19 +538,19 @@ public class DoFnOperatorTest {
});
}
- private Iterable<RawUnionValue> stripStreamRecordFromRawUnion(Iterable<Object> input) {
+ private Iterable<WindowedValue<String>> stripStreamRecord(Iterable<?> input) {
return FluentIterable.from(input).filter(new Predicate<Object>() {
@Override
public boolean apply(@Nullable Object o) {
- return o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue;
+ return o instanceof StreamRecord;
}
- }).transform(new Function<Object, RawUnionValue>() {
+ }).transform(new Function<Object, WindowedValue<String>>() {
@Nullable
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
- public RawUnionValue apply(@Nullable Object o) {
- if (o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof RawUnionValue) {
- return (RawUnionValue) ((StreamRecord) o).getValue();
+ public WindowedValue<String> apply(@Nullable Object o) {
+ if (o instanceof StreamRecord) {
+ return (WindowedValue<String>) ((StreamRecord) o).getValue();
}
throw new RuntimeException("unreachable");
}