You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:04 UTC
[30/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
new file mode 100644
index 0000000..123d5e7
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains all the mappings between Beam and Flink
+ * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator}
+ * traverses the Beam job and comes here to translate the encountered Beam transformations
+ * into Flink one, based on the mapping available in this class.
+ */
+class FlinkStreamingTransformTranslators {
+
+ // --------------------------------------------------------------------------------------------
+ // Transform Translator Registry
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<
+ Class<? extends PTransform>,
+ FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+
+ // here you can find all the available translators.
+ static {
+ TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
+ TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+ TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+ TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
+ TRANSLATORS.put(
+ SplittableParDo.ProcessElements.class, new SplittableProcessElementsStreamingTranslator());
+ TRANSLATORS.put(
+ SplittableParDo.GBKIntoKeyedWorkItems.class, new GBKIntoKeyedWorkItemsTranslator());
+
+
+ TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
+ TRANSLATORS.put(
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
+ new CreateViewStreamingTranslator());
+
+ TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+ TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+ }
+
+ public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
+ PTransform<?, ?> transform) {
+ return TRANSLATORS.get(transform.getClass());
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Transformation Implementations
+ // --------------------------------------------------------------------------------------------
+
+ private static class TextIOWriteBoundStreamingTranslator
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+
+ @Override
+ public void translateNode(
+ TextIO.Write.Bound transform,
+ FlinkStreamingTranslationContext context) {
+ PValue input = context.getInput(transform);
+ DataStream<WindowedValue<String>> inputDataStream = context.getInputDataStream(input);
+
+ String filenamePrefix = transform.getFilenamePrefix();
+ String filenameSuffix = transform.getFilenameSuffix();
+ boolean needsValidation = transform.needsValidation();
+ int numShards = transform.getNumShards();
+ String shardNameTemplate = transform.getShardNameTemplate();
+
+ // TODO: Implement these. We need Flink support for this.
+ LOG.warn(
+ "Translation of TextIO.Write.needsValidation not yet supported. Is: {}.",
+ needsValidation);
+ LOG.warn(
+ "Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+ filenameSuffix);
+ LOG.warn(
+ "Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.",
+ shardNameTemplate);
+
+ DataStream<String> dataSink = inputDataStream
+ .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
+ @Override
+ public void flatMap(
+ WindowedValue<String> value,
+ Collector<String> out)
+ throws Exception {
+ out.collect(value.getValue());
+ }
+ });
+ DataStreamSink<String> output =
+ dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+
+ if (numShards > 0) {
+ output.setParallelism(numShards);
+ }
+ }
+ }
+
+ private static class UnboundedReadSourceTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+
+ @Override
+ public void translateNode(
+ Read.Unbounded<T> transform,
+ FlinkStreamingTranslationContext context) {
+ PCollection<T> output = context.getOutput(transform);
+
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ DataStream<WindowedValue<T>> source;
+ try {
+ UnboundedSourceWrapper<T, ?> sourceWrapper =
+ new UnboundedSourceWrapper<>(
+ context.getPipelineOptions(),
+ transform.getSource(),
+ context.getExecutionEnvironment().getParallelism());
+ source = context
+ .getExecutionEnvironment()
+ .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Error while translating UnboundedSource: " + transform.getSource(), e);
+ }
+
+ context.setOutputDataStream(output, source);
+ }
+ }
+
+ private static class BoundedReadSourceTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(
+ Read.Bounded<T> transform,
+ FlinkStreamingTranslationContext context) {
+ PCollection<T> output = context.getOutput(transform);
+
+ TypeInformation<WindowedValue<T>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+
+ DataStream<WindowedValue<T>> source;
+ try {
+ BoundedSourceWrapper<T> sourceWrapper =
+ new BoundedSourceWrapper<>(
+ context.getPipelineOptions(),
+ transform.getSource(),
+ context.getExecutionEnvironment().getParallelism());
+ source = context
+ .getExecutionEnvironment()
+ .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Error while translating BoundedSource: " + transform.getSource(), e);
+ }
+
+ context.setOutputDataStream(output, source);
+ }
+ }
+
+ /**
+ * Wraps each element in a {@link RawUnionValue} with the given tag id.
+ */
+ private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+ private final int intTag;
+
+ public ToRawUnion(int intTag) {
+ this.intTag = intTag;
+ }
+
+ @Override
+ public RawUnionValue map(T o) throws Exception {
+ return new RawUnionValue(intTag, o);
+ }
+ }
+
+ private static Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>>
+ transformSideInputs(
+ Collection<PCollectionView<?>> sideInputs,
+ FlinkStreamingTranslationContext context) {
+
+ // collect all side inputs
+ Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+ Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+ int count = 0;
+ for (PCollectionView<?> sideInput: sideInputs) {
+ TupleTag<?> tag = sideInput.getTagInternal();
+ intToViewMapping.put(count, sideInput);
+ tagToIntMapping.put(tag, count);
+ count++;
+ Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+ }
+
+
+ List<Coder<?>> inputCoders = new ArrayList<>();
+ for (PCollectionView<?> sideInput: sideInputs) {
+ DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+ TypeInformation<Object> tpe = sideInputStream.getType();
+ if (!(tpe instanceof CoderTypeInformation)) {
+ throw new IllegalStateException(
+ "Input Stream TypeInformation is no CoderTypeInformation.");
+ }
+
+ Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
+ inputCoders.add(coder);
+ }
+
+ UnionCoder unionCoder = UnionCoder.of(inputCoders);
+
+ CoderTypeInformation<RawUnionValue> unionTypeInformation =
+ new CoderTypeInformation<>(unionCoder);
+
+ // transform each side input to RawUnionValue and union them
+ DataStream<RawUnionValue> sideInputUnion = null;
+
+ for (PCollectionView<?> sideInput: sideInputs) {
+ TupleTag<?> tag = sideInput.getTagInternal();
+ final int intTag = tagToIntMapping.get(tag);
+ DataStream<Object> sideInputStream = context.getInputDataStream(sideInput);
+ DataStream<RawUnionValue> unionValueStream =
+ sideInputStream.map(new ToRawUnion<>(intTag)).returns(unionTypeInformation);
+
+ if (sideInputUnion == null) {
+ sideInputUnion = unionValueStream;
+ } else {
+ sideInputUnion = sideInputUnion.union(unionValueStream);
+ }
+ }
+
+ if (sideInputUnion == null) {
+ throw new IllegalStateException("No unioned side inputs, this indicates a bug.");
+ }
+
+ return new Tuple2<>(intToViewMapping, sideInputUnion);
+ }
+
+ /**
+ * Helper for translating {@link ParDo.MultiOutput} and {@link SplittableParDo.ProcessElements}.
+ */
+ static class ParDoTranslationHelper {
+
+ interface DoFnOperatorFactory<InputT, OutputT> {
+ DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ DoFn<InputT, OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<WindowedValue<InputT>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs);
+ }
+
+ static <InputT, OutputT> void translateParDo(
+ String transformName,
+ DoFn<InputT, OutputT> doFn,
+ PCollection<InputT> input,
+ List<PCollectionView<?>> sideInputs,
+ Map<TupleTag<?>, PValue> outputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ FlinkStreamingTranslationContext context,
+ DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
+
+ // we assume that the transformation does not change the windowing strategy.
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ Map<TupleTag<?>, Integer> tagsToLabels =
+ transformTupleTagsToLabels(mainOutputTag, outputs);
+
+ SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+
+ Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
+
+ DataStream<WindowedValue<InputT>> inputDataStream = context.getInputDataStream(input);
+
+ Coder keyCoder = null;
+ boolean stateful = false;
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+ if (signature.stateDeclarations().size() > 0
+ || signature.timerDeclarations().size() > 0) {
+ // Based on the fact that the signature is stateful, DoFnSignatures ensures
+ // that it is also keyed
+ keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
+ inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder));
+ stateful = true;
+ } else if (doFn instanceof SplittableParDo.ProcessFn) {
+ // we know that it is keyed on String
+ keyCoder = StringUtf8Coder.of();
+ stateful = true;
+ }
+
+ if (sideInputs.isEmpty()) {
+ DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ doFnOperatorFactory.createDoFnOperator(
+ doFn,
+ sideInputs,
+ mainOutputTag,
+ additionalOutputTags,
+ context,
+ windowingStrategy,
+ tagsToLabels,
+ inputCoder,
+ keyCoder,
+ new HashMap<Integer, PCollectionView<?>>() /* side-input mapping */);
+
+ UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+ CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+ new CoderTypeInformation<>(outputUnionCoder);
+
+ unionOutputStream = inputDataStream
+ .transform(transformName, outputUnionTypeInformation, doFnOperator);
+
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformedSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+ doFnOperatorFactory.createDoFnOperator(
+ doFn,
+ sideInputs,
+ mainOutputTag,
+ additionalOutputTags,
+ context,
+ windowingStrategy,
+ tagsToLabels,
+ inputCoder,
+ keyCoder,
+ transformedSideInputs.f0);
+
+ UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+ CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+ new CoderTypeInformation<>(outputUnionCoder);
+
+ if (stateful) {
+ // we have to manually contruct the two-input transform because we're not
+ // allowed to have only one input keyed, normally.
+ KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+ TwoInputTransformation<
+ WindowedValue<KV<?, InputT>>,
+ RawUnionValue,
+ WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation(
+ keyedStream.getTransformation(),
+ transformedSideInputs.f1.broadcast().getTransformation(),
+ transformName,
+ (TwoInputStreamOperator) doFnOperator,
+ outputUnionTypeInformation,
+ keyedStream.getParallelism());
+
+ rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+ rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null);
+
+ unionOutputStream = new SingleOutputStreamOperator(
+ keyedStream.getExecutionEnvironment(),
+ rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+ keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+ } else {
+ unionOutputStream = inputDataStream
+ .connect(transformedSideInputs.f1.broadcast())
+ .transform(transformName, outputUnionTypeInformation, doFnOperator);
+ }
+ }
+
+ SplitStream<RawUnionValue> splitStream = unionOutputStream
+ .split(new OutputSelector<RawUnionValue>() {
+ @Override
+ public Iterable<String> select(RawUnionValue value) {
+ return Collections.singletonList(Integer.toString(value.getUnionTag()));
+ }
+ });
+
+ for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+ final int outputTag = tagsToLabels.get(output.getKey());
+
+ TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) output.getValue());
+
+ @SuppressWarnings("unchecked")
+ DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+ .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+ @Override
+ public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception {
+ out.collect(value.getValue());
+ }
+ }).returns(outputTypeInfo);
+
+ context.setOutputDataStream(output.getValue(), unwrapped);
+ }
+ }
+
+ private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+ TupleTag<?> mainTag,
+ Map<TupleTag<?>, PValue> allTaggedValues) {
+
+ Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+ int count = 0;
+ tagToLabelMap.put(mainTag, count++);
+ for (TupleTag<?> key : allTaggedValues.keySet()) {
+ if (!tagToLabelMap.containsKey(key)) {
+ tagToLabelMap.put(key, count++);
+ }
+ }
+ return tagToLabelMap;
+ }
+
+ private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> taggedCollections) {
+ List<Coder<?>> outputCoders = Lists.newArrayList();
+ for (PValue taggedColl : taggedCollections.values()) {
+ checkArgument(
+ taggedColl instanceof PCollection,
+ "A Union Coder can only be created for a Collection of Tagged %s. Got %s",
+ PCollection.class.getSimpleName(),
+ taggedColl.getClass().getSimpleName());
+ PCollection<?> coll = (PCollection<?>) taggedColl;
+ WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ coll.getCoder(),
+ coll.getWindowingStrategy().getWindowFn().windowCoder());
+ outputCoders.add(windowedValueCoder);
+ }
+ return UnionCoder.of(outputCoders);
+ }
+ }
+
+ private static class ParDoStreamingTranslator<InputT, OutputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ ParDo.MultiOutput<InputT, OutputT>> {
+
+ @Override
+ public void translateNode(
+ ParDo.MultiOutput<InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ ParDoTranslationHelper.translateParDo(
+ transform.getName(),
+ transform.getFn(),
+ (PCollection<InputT>) context.getInput(transform),
+ transform.getSideInputs(),
+ context.getOutputs(transform),
+ transform.getMainOutputTag(),
+ transform.getAdditionalOutputTags().getAll(),
+ context,
+ new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
+ @Override
+ public DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+ DoFn<InputT, OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<WindowedValue<InputT>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs) {
+ return new DoFnOperator<>(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ additionalOutputTags,
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ transformedSideInputs,
+ sideInputs,
+ context.getPipelineOptions(),
+ keyCoder);
+ }
+ });
+ }
+ }
+
+ private static class SplittableProcessElementsStreamingTranslator<
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
+
+ @Override
+ public void translateNode(
+ SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ ParDoTranslationHelper.translateParDo(
+ transform.getName(),
+ transform.newProcessFn(transform.getFn()),
+ (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
+ context.getInput(transform),
+ transform.getSideInputs(),
+ context.getOutputs(transform),
+ transform.getMainOutputTag(),
+ transform.getAdditionalOutputTags().getAll(),
+ context,
+ new ParDoTranslationHelper.DoFnOperatorFactory<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() {
+ @Override
+ public DoFnOperator<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT,
+ RawUnionValue> createDoFnOperator(
+ DoFn<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>,
+ OutputT> doFn,
+ List<PCollectionView<?>> sideInputs,
+ TupleTag<OutputT> mainOutputTag,
+ List<TupleTag<?>> additionalOutputTags,
+ FlinkStreamingTranslationContext context,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<TupleTag<?>, Integer> tagsToLabels,
+ Coder<
+ WindowedValue<
+ KeyedWorkItem<
+ String,
+ ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
+ Coder keyCoder,
+ Map<Integer, PCollectionView<?>> transformedSideInputs) {
+ return new SplittableDoFnOperator<>(
+ doFn,
+ inputCoder,
+ mainOutputTag,
+ additionalOutputTags,
+ new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+ windowingStrategy,
+ transformedSideInputs,
+ sideInputs,
+ context.getPipelineOptions(),
+ keyCoder);
+ }
+ });
+ }
+ }
+
+ private static class CreateViewStreamingTranslator<ElemT, ViewT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
+
+ @Override
+ public void translateNode(
+ FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> transform,
+ FlinkStreamingTranslationContext context) {
+ // just forward
+ DataStream<WindowedValue<List<ElemT>>> inputDataSet =
+ context.getInputDataStream(context.getInput(transform));
+
+ PCollectionView<ViewT> view = context.getOutput(transform);
+
+ context.setOutputDataStream(view, inputDataSet);
+ }
+ }
+
+ private static class WindowAssignTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
+
+ @Override
+ public void translateNode(
+ Window.Assign<T> transform,
+ FlinkStreamingTranslationContext context) {
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<T, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<T, BoundedWindow>)
+ context.getOutput(transform).getWindowingStrategy();
+
+ TypeInformation<WindowedValue<T>> typeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ DataStream<WindowedValue<T>> inputDataStream =
+ context.getInputDataStream(context.getInput(transform));
+
+ WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+ FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+ new FlinkAssignWindows<>(windowFn);
+
+ SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
+ .flatMap(assignWindowsFunction)
+ .name(context.getOutput(transform).getName())
+ .returns(typeInfo);
+
+ context.setOutputDataStream(context.getOutput(transform), outputDataStream);
+ }
+ }
+
+ private static class ReshuffleTranslatorStreaming<K, InputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Reshuffle<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataStream(context.getInput(transform));
+
+ context.setOutputDataStream(context.getOutput(transform), inputDataSet.rebalance());
+
+ }
+ }
+
+
+ private static class GroupByKeyTranslator<K, InputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ GroupByKey<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+ KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+ SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+ inputKvCoder.getKeyCoder(),
+ inputKvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+ new CoderTypeInformation<>(windowedWorkItemCoder);
+
+ DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+ inputDataStream
+ .flatMap(new ToKeyedWorkItem<K, InputT>())
+ .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+ KeyedStream<
+ WindowedValue<
+ SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+ .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+ SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, BoundedWindow> reduceFn =
+ SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+
+ TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ DoFnOperator.DefaultOutputManagerFactory<
+ WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+ new DoFnOperator.DefaultOutputManagerFactory<>();
+
+ WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ (Coder) windowedWorkItemCoder,
+ new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ outputManagerFactory,
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder());
+
+ // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+ // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+ @SuppressWarnings("unchecked")
+ SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> outDataStream =
+ keyedWorkItemStream
+ .transform(
+ transform.getName(),
+ outputTypeInfo,
+ (OneInputStreamOperator) doFnOperator);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+
+ }
+ }
+
+ private static class CombinePerKeyTranslator<K, InputT, OutputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ Combine.PerKey<K, InputT, OutputT>> {
+
+ @Override
+ boolean canTranslate(
+ Combine.PerKey<K, InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ // if we have a merging window strategy and side inputs we cannot
+ // translate as a proper combine. We have to group and then run the combine
+ // over the final grouped values.
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+ return windowingStrategy.getWindowFn().isNonMerging() || transform.getSideInputs().isEmpty();
+ }
+
+ @Override
+ public void translateNode(
+ Combine.PerKey<K, InputT, OutputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+ KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+ SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+ inputKvCoder.getKeyCoder(),
+ inputKvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+ new CoderTypeInformation<>(windowedWorkItemCoder);
+
+ DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+ inputDataStream
+ .flatMap(new ToKeyedWorkItem<K, InputT>())
+ .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+ KeyedStream<
+ WindowedValue<
+ SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+ .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+ SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = SystemReduceFn.combining(
+ inputKvCoder.getKeyCoder(),
+ AppliedCombineFn.withInputCoder(
+ transform.getFn(), input.getPipeline().getCoderRegistry(), inputKvCoder));
+
+ TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
+ context.getTypeInfo(context.getOutput(transform));
+
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+ if (sideInputs.isEmpty()) {
+
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ (Coder) windowedWorkItemCoder,
+ new TupleTag<KV<K, OutputT>>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ windowingStrategy,
+ new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+ Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder());
+
+ // our operator excepts WindowedValue<KeyedWorkItem> while our input stream
+ // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java doesn't like it ...
+ @SuppressWarnings("unchecked")
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+ keyedWorkItemStream.transform(
+ transform.getName(), outputTypeInfo, (OneInputStreamOperator) doFnOperator);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ } else {
+ Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> transformSideInputs =
+ transformSideInputs(sideInputs, context);
+
+ WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+ new WindowDoFnOperator<>(
+ reduceFn,
+ (Coder) windowedWorkItemCoder,
+ new TupleTag<KV<K, OutputT>>("main output"),
+ Collections.<TupleTag<?>>emptyList(),
+ new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+ windowingStrategy,
+ transformSideInputs.f0,
+ sideInputs,
+ context.getPipelineOptions(),
+ inputKvCoder.getKeyCoder());
+
+ // we have to manually contruct the two-input transform because we're not
+ // allowed to have only one input keyed, normally.
+
+ TwoInputTransformation<
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+ RawUnionValue,
+ WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new TwoInputTransformation<>(
+ keyedWorkItemStream.getTransformation(),
+ transformSideInputs.f1.broadcast().getTransformation(),
+ transform.getName(),
+ (TwoInputStreamOperator) doFnOperator,
+ outputTypeInfo,
+ keyedWorkItemStream.getParallelism());
+
+ rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+ rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), null);
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> outDataStream =
+ new SingleOutputStreamOperator(
+ keyedWorkItemStream.getExecutionEnvironment(),
+ rawFlinkTransform) {}; // we have to cheat around the ctor being protected
+
+ keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ }
+ }
+ }
+
+ private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+ @Override
+ boolean canTranslate(
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+ return true;
+ }
+
+ @Override
+ public void translateNode(
+ SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+ FlinkStreamingTranslationContext context) {
+
+ PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+ KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+ SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = SingletonKeyedWorkItemCoder.of(
+ inputKvCoder.getKeyCoder(),
+ inputKvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+
+ WindowedValue.
+ FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> windowedWorkItemCoder =
+ WindowedValue.getFullCoder(
+ workItemCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemTypeInfo =
+ new CoderTypeInformation<>(windowedWorkItemCoder);
+
+ DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = context.getInputDataStream(input);
+
+ DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> workItemStream =
+ inputDataStream
+ .flatMap(new ToKeyedWorkItem<K, InputT>())
+ .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+ KeyedStream<
+ WindowedValue<
+ SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> keyedWorkItemStream = workItemStream
+ .keyBy(new WorkItemKeySelector<K, InputT>(inputKvCoder.getKeyCoder()));
+
+ context.setOutputDataStream(context.getOutput(transform), keyedWorkItemStream);
+ }
+ }
+
+ private static class FlattenPCollectionTranslator<T>
+ extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+ Flatten.PCollections<T>> {
+
+ @Override
+ public void translateNode(
+ Flatten.PCollections<T> transform,
+ FlinkStreamingTranslationContext context) {
+ Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
+
+ if (allInputs.isEmpty()) {
+
+ // create an empty dummy source to satisfy downstream operations
+ // we cannot create an empty source in Flink, therefore we have to
+ // add the flatMap that simply never forwards the single element
+ DataStreamSource<String> dummySource =
+ context.getExecutionEnvironment().fromElements("dummy");
+
+ DataStream<WindowedValue<T>> result = dummySource.flatMap(
+ new FlatMapFunction<String, WindowedValue<T>>() {
+ @Override
+ public void flatMap(
+ String s,
+ Collector<WindowedValue<T>> collector) throws Exception {
+ // never return anything
+ }
+ }).returns(
+ new CoderTypeInformation<>(
+ WindowedValue.getFullCoder(
+ (Coder<T>) VoidCoder.of(),
+ GlobalWindow.Coder.INSTANCE)));
+ context.setOutputDataStream(context.getOutput(transform), result);
+
+ } else {
+ DataStream<T> result = null;
+ for (PValue input : allInputs.values()) {
+ DataStream<T> current = context.getInputDataStream(input);
+ result = (result == null) ? current : result.union(current);
+ }
+ context.setOutputDataStream(context.getOutput(transform), result);
+ }
+ }
+ }
+
+ private static class ToKeyedWorkItem<K, InputT>
+ extends RichFlatMapFunction<
+ WindowedValue<KV<K, InputT>>,
+ WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+ @Override
+ public void flatMap(
+ WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+ Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) throws Exception {
+
+ // we need to wrap each one work item per window for now
+ // since otherwise the PushbackSideInputRunner will not correctly
+ // determine whether side inputs are ready
+ //
+ // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+ for (WindowedValue<KV<K, InputT>> in : inWithMultipleWindows.explodeWindows()) {
+ SingletonKeyedWorkItem<K, InputT> workItem =
+ new SingletonKeyedWorkItem<>(
+ in.getValue().getKey(),
+ in.withValue(in.getValue().getValue()));
+
+ out.collect(in.withValue(workItem));
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
new file mode 100644
index 0000000..1a943a3
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Helper for keeping track of which {@link DataStream DataStreams} map
+ * to which {@link PTransform PTransforms}.
+ */
+class FlinkStreamingTranslationContext {
+
+ private final StreamExecutionEnvironment env;
+ private final PipelineOptions options;
+
+ /**
+ * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
+ * Flink Operator that produced it, after the translation of the correspondinf PTransform
+ * to its Flink equivalent.
+ * */
+ private final Map<PValue, DataStream<?>> dataStreams;
+
+ private AppliedPTransform<?, ?, ?> currentTransform;
+
+ public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
+ this.env = checkNotNull(env);
+ this.options = checkNotNull(options);
+ this.dataStreams = new HashMap<>();
+ }
+
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataStream<T> getInputDataStream(PValue value) {
+ return (DataStream<T>) dataStreams.get(value);
+ }
+
+ public void setOutputDataStream(PValue value, DataStream<?> set) {
+ if (!dataStreams.containsKey(value)) {
+ dataStreams.put(value, set);
+ }
+ }
+
+ /**
+ * Sets the AppliedPTransform which carries input/output.
+ * @param currentTransform
+ */
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
+
+ public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
+ Coder<T> valueCoder = collection.getCoder();
+
+ return WindowedValue.getFullCoder(
+ valueCoder,
+ collection.getWindowingStrategy().getWindowFn().windowCoder());
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) {
+ Coder<T> valueCoder = collection.getCoder();
+ WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+ WindowedValue.getFullCoder(
+ valueCoder,
+ collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+ return new CoderTypeInformation<>(windowedValueCoder);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+ return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+ }
+
+ public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {
+ return currentTransform.getInputs();
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends PValue> T getOutput(PTransform<?, T> transform) {
+ return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values());
+ }
+
+ public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
+ PTransform<?, OutputT> transform) {
+ return currentTransform.getOutputs();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
new file mode 100644
index 0000000..f955f2a
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Flink streaming overrides for various view (side input) transforms.
+ */
+class FlinkStreamingViewOverrides {
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Flink runner in streaming mode.
+ */
+ static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ private final transient FlinkRunner runner;
+
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ private final transient FlinkRunner runner;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsList View.AsList} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link View.AsIterable View.AsIterable} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> transform) { }
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input,
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized expansion for
+ * {@link View.AsSingleton View.AsSingleton} for the
+ * Flink runner in streaming mode.
+ */
+ static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+ private View.AsSingleton<T> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingCombineGloballyAsSingletonView(
+ FlinkRunner runner,
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined,
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateFlinkPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateFlinkPCollectionView<>(view);
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
new file mode 100644
index 0000000..3acc3ea
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
+ */
+class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
+
+ private TranslationMode translationMode;
+
+ private final FlinkPipelineOptions options;
+
+ public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) {
+ this.translationMode = defaultMode;
+ this.options = options;
+ }
+
+ public TranslationMode getTranslationMode() {
+
+ // override user-specified translation mode
+ if (options.isStreaming()) {
+ return TranslationMode.STREAMING;
+ }
+
+ return translationMode;
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+
+ @Override
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ Class<? extends PTransform> transformClass = node.getTransform().getClass();
+ if (transformClass == Read.Unbounded.class) {
+ LOG.info("Found {}. Switching to streaming execution.", transformClass);
+ translationMode = TranslationMode.STREAMING;
+ }
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
new file mode 100644
index 0000000..8f50105
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.UserCodeException;
+
+/**
+ * Test Flink runner.
+ */
+public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
+
+ private FlinkRunner delegate;
+
+ private TestFlinkRunner(FlinkPipelineOptions options) {
+ // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
+ options.setFlinkMaster("[auto]");
+ this.delegate = FlinkRunner.fromOptions(options);
+ }
+
+ public static TestFlinkRunner fromOptions(PipelineOptions options) {
+ FlinkPipelineOptions flinkOptions =
+ PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+ return new TestFlinkRunner(flinkOptions);
+ }
+
+ public static TestFlinkRunner create(boolean streaming) {
+ FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ flinkOptions.setRunner(TestFlinkRunner.class);
+ flinkOptions.setStreaming(streaming);
+ return TestFlinkRunner.fromOptions(flinkOptions);
+ }
+
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ try {
+ return delegate.run(pipeline);
+ } catch (Throwable t) {
+ // Special case hack to pull out assertion errors from PAssert; instead there should
+ // probably be a better story along the lines of UserCodeException.
+ UserCodeException innermostUserCodeException = null;
+ Throwable current = t;
+ for (; current.getCause() != null; current = current.getCause()) {
+ if (current instanceof UserCodeException) {
+ innermostUserCodeException = ((UserCodeException) current);
+ }
+ }
+ if (innermostUserCodeException != null) {
+ current = innermostUserCodeException.getCause();
+ }
+ if (current instanceof AssertionError) {
+ throw (AssertionError) current;
+ }
+ throw new PipelineExecutionException(current);
+ }
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return delegate.getPipelineOptions();
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
new file mode 100644
index 0000000..ad54750
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+/**
+ * The translation mode of the Beam Pipeline.
+ */
+enum TranslationMode {
+
+ /** Uses the batch mode of Flink. */
+ BATCH,
+
+ /** Uses the streaming mode of Flink. */
+ STREAMING
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
new file mode 100644
index 0000000..fb2493b
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A {@link AggregatorFactory} for the Flink Batch Runner.
+ */
+public class FlinkAggregatorFactory implements AggregatorFactory{
+
+ private final RuntimeContext runtimeContext;
+
+ public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ }
+
+ @Override
+ public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+ Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
+ Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+ @SuppressWarnings("unchecked")
+ SerializableFnAggregatorWrapper<InputT, OutputT> result =
+ (SerializableFnAggregatorWrapper<InputT, OutputT>)
+ runtimeContext.getAccumulator(aggregatorName);
+
+ if (result == null) {
+ result = new SerializableFnAggregatorWrapper<>(combine);
+ runtimeContext.addAccumulator(aggregatorName, result);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
new file mode 100644
index 0000000..447b1e5
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
+ * Flink functions.
+ */
+class FlinkAssignContext<InputT, W extends BoundedWindow>
+ extends WindowFn<InputT, W>.AssignContext {
+ private final WindowedValue<InputT> value;
+
+ FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+ fn.super();
+ checkArgument(
+ Iterables.size(value.getWindows()) == 1,
+ String.format(
+ "%s passed to window assignment must be in a single window, but it was in %s: %s",
+ WindowedValue.class.getSimpleName(),
+ Iterables.size(value.getWindows()),
+ value.getWindows()));
+ this.value = value;
+ }
+
+ @Override
+ public InputT element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
new file mode 100644
index 0000000..c3a5095
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Flink {@link FlatMapFunction} for implementing
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ */
+public class FlinkAssignWindows<T, W extends BoundedWindow>
+ implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+ private final WindowFn<T, W> windowFn;
+
+ public FlinkAssignWindows(WindowFn<T, W> windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public void flatMap(
+ WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception {
+ Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
+ for (W window: windows) {
+ collector.collect(
+ WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
new file mode 100644
index 0000000..51582af
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/**
+ * Encapsulates a {@link DoFn}
+ * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ *
+ * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
+ * and must tag all outputs with the output number. Afterwards a filter will filter out
+ * those elements that are not to be in a specific output.
+ */
+public class FlinkDoFnFunction<InputT, OutputT>
+ extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
+
+ private final SerializedPipelineOptions serializedOptions;
+
+ private final DoFn<InputT, OutputT> doFn;
+ private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ private final Map<TupleTag<?>, Integer> outputMap;
+ private final TupleTag<OutputT> mainOutputTag;
+
+ private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+ public FlinkDoFnFunction(
+ DoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ PipelineOptions options,
+ Map<TupleTag<?>, Integer> outputMap,
+ TupleTag<OutputT> mainOutputTag) {
+
+ this.doFn = doFn;
+ this.sideInputs = sideInputs;
+ this.serializedOptions = new SerializedPipelineOptions(options);
+ this.windowingStrategy = windowingStrategy;
+ this.outputMap = outputMap;
+ this.mainOutputTag = mainOutputTag;
+
+ }
+
+ @Override
+ public void mapPartition(
+ Iterable<WindowedValue<InputT>> values,
+ Collector<WindowedValue<OutputT>> out) throws Exception {
+
+ RuntimeContext runtimeContext = getRuntimeContext();
+
+ DoFnRunners.OutputManager outputManager;
+ if (outputMap == null) {
+ outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
+ } else {
+ // it has some additional outputs
+ outputManager =
+ new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
+ }
+
+ DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
+ serializedOptions.getPipelineOptions(), doFn,
+ new FlinkSideInputReader(sideInputs, runtimeContext),
+ outputManager,
+ mainOutputTag,
+ // see SimpleDoFnRunner, just use it to limit number of additional outputs
+ Collections.<TupleTag<?>>emptyList(),
+ new FlinkNoOpStepContext(),
+ new FlinkAggregatorFactory(runtimeContext),
+ windowingStrategy);
+
+ doFnRunner.startBundle();
+
+ for (WindowedValue<InputT> value : values) {
+ doFnRunner.processElement(value);
+ }
+
+ doFnRunner.finishBundle();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+ }
+
+ @Override
+ public void close() throws Exception {
+ doFnInvoker.invokeTeardown();
+ }
+
+ static class DoFnOutputManager
+ implements DoFnRunners.OutputManager {
+
+ private Collector collector;
+
+ DoFnOutputManager(Collector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ collector.collect(output);
+ }
+ }
+
+ static class MultiDoFnOutputManager
+ implements DoFnRunners.OutputManager {
+
+ private Collector<WindowedValue<RawUnionValue>> collector;
+ private Map<TupleTag<?>, Integer> outputMap;
+
+ MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector,
+ Map<TupleTag<?>, Integer> outputMap) {
+ this.collector = collector;
+ this.outputMap = outputMap;
+ }
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()),
+ output.getTimestamp(), output.getWindows(), output.getPane()));
+ }
+ }
+
+}