You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:02 UTC
[07/50] [abbrv] incubator-beam git commit: [runner] add streaming
support with checkpointing
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
new file mode 100644
index 0000000..8f64730
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
@@ -0,0 +1,593 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.dataartisans.flink.dataflow.io.ConsoleIO;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkCoGroupKeyedListAggregator;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkCreateFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkDoFnFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkKeyedListAggregationFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkMultiOutputDoFnFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkMultiOutputPruningFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkPartialReduceFunction;
+import com.dataartisans.flink.dataflow.translation.functions.FlinkReduceFunction;
+import com.dataartisans.flink.dataflow.translation.functions.UnionCoder;
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.types.KvCoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.wrappers.SinkOutputFormat;
+import com.dataartisans.flink.dataflow.translation.wrappers.SourceInputFormat;
+import com.google.api.client.util.Maps;
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.Write;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.core.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Translators for transforming
+ * Dataflow {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s to
+ * Flink {@link org.apache.flink.api.java.DataSet}s
+ */
+public class FlinkBatchTransformTranslators {
+
+ // --------------------------------------------------------------------------------------------
+ // Transform Translator Registry
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
+
+ // register the known translators
+ static {
+ TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
+
+ TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
+ // we don't need this because we translate the Combine.PerKey directly
+ //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator());
+
+ TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
+
+ TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
+
+ TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
+ // TODO we're currently ignoring windows here but that has to change in the future
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+
+ TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
+ TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
+
+ TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
+
+ TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
+ TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch());
+
+ TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+ TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
+
+ TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
+ TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch());
+
+ // Flink-specific
+ TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch());
+
+ }
+
+
+ public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ return TRANSLATORS.get(transform.getClass());
+ }
+
+ private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
+ String name = transform.getName();
+ BoundedSource<T> source = transform.getSource();
+ PCollection<T> output = context.getOutput(transform);
+ Coder<T> coder = output.getCoder();
+
+ TypeInformation<T> typeInformation = context.getTypeInfo(output);
+
+ DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), new SourceInputFormat<>(source, context.getPipelineOptions(), coder), typeInformation, name);
+
+ context.setOutputDataSet(output, dataSource);
+ }
+ }
+
+ private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
+
+ @Override
+ public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) {
+ String path = transform.getFilepattern();
+ String name = transform.getName();
+// Schema schema = transform.getSchema();
+ PValue output = context.getOutput(transform);
+
+ TypeInformation<T> typeInformation = context.getTypeInfo(output);
+
+ // This is super hacky, but unfortunately we cannot get the type otherwise
+ Class<T> extractedAvroType;
+ try {
+ Field typeField = transform.getClass().getDeclaredField("type");
+ typeField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Class<T> avroType = (Class<T>) typeField.get(transform);
+ extractedAvroType = avroType;
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ // we know that the field is there and it is accessible
+ throw new RuntimeException("Could not access type from AvroIO.Bound", e);
+ }
+
+ DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(),
+ new AvroInputFormat<>(new Path(path), extractedAvroType),
+ typeInformation, name);
+
+ context.setOutputDataSet(output, source);
+ }
+ }
+
+ private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
+
+ @Override
+ public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
+ DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ String filenamePrefix = transform.getFilenamePrefix();
+ String filenameSuffix = transform.getFilenameSuffix();
+ int numShards = transform.getNumShards();
+ String shardNameTemplate = transform.getShardNameTemplate();
+
+ // TODO: Implement these. We need Flink support for this.
+ LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
+ filenameSuffix);
+ LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+
+ // This is super hacky, but unfortunately we cannot get the type otherwise
+ Class<T> extractedAvroType;
+ try {
+ Field typeField = transform.getClass().getDeclaredField("type");
+ typeField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Class<T> avroType = (Class<T>) typeField.get(transform);
+ extractedAvroType = avroType;
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ // we know that the field is there and it is accessible
+ throw new RuntimeException("Could not access type from AvroIO.Bound", e);
+ }
+
+ DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path
+ (filenamePrefix), extractedAvroType));
+
+ if (numShards > 0) {
+ dataSink.setParallelism(numShards);
+ }
+ }
+ }
+
+ private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> {
+ private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
+
+ @Override
+ public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) {
+ String path = transform.getFilepattern();
+ String name = transform.getName();
+
+ TextIO.CompressionType compressionType = transform.getCompressionType();
+ boolean needsValidation = transform.needsValidation();
+
+ // TODO: Implement these. We need Flink support for this.
+ LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType);
+ LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation);
+
+ PValue output = context.getOutput(transform);
+
+ TypeInformation<String> typeInformation = context.getTypeInfo(output);
+ DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name);
+
+ context.setOutputDataSet(output, source);
+ }
+ }
+
+ private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
+
+ @Override
+ public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
+ PValue input = context.getInput(transform);
+ DataSet<T> inputDataSet = context.getInputDataSet(input);
+
+ String filenamePrefix = transform.getFilenamePrefix();
+ String filenameSuffix = transform.getFilenameSuffix();
+ boolean needsValidation = transform.needsValidation();
+ int numShards = transform.getNumShards();
+ String shardNameTemplate = transform.getShardNameTemplate();
+
+ // TODO: Implement these. We need Flink support for this.
+ LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
+ LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
+ LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+
+ //inputDataSet.print();
+ DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
+
+ if (numShards > 0) {
+ dataSink.setParallelism(numShards);
+ }
+ }
+ }
+
+ private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
+ @Override
+ public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) {
+ PValue input = (PValue) context.getInput(transform);
+ DataSet<?> inputDataSet = context.getInputDataSet(input);
+ inputDataSet.printOnTaskManager(transform.getName());
+ }
+ }
+
+ private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
+
+ @Override
+ public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
+ String name = transform.getName();
+ PValue input = context.getInput(transform);
+ DataSet<T> inputDataSet = context.getInputDataSet(input);
+
+ inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name);
+ }
+ }
+
+ private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> {
+
+ @Override
+ public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
+ DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+
+ TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
+
+ GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
+ new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+ }
+
+ /**
+ * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch}
+ */
+ private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
+
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) {
+ DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
+
+ TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
+
+ GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
+ new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+ }
+
+ private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> {
+
+ @Override
+ public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) {
+ DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform));
+
+ @SuppressWarnings("unchecked")
+ Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
+
+ KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
+
+ Coder<VA> accumulatorCoder =
+ null;
+ try {
+ accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+ } catch (CannotProvideCoderException e) {
+ e.printStackTrace();
+ // TODO
+ }
+
+ TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder);
+ TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder));
+
+ Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
+
+ FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
+
+ // Partially GroupReduce the values into the intermediate format VA (combine)
+ GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
+ new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction,
+ "GroupCombine: " + transform.getName());
+
+ // Reduce fully to VO
+ GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
+
+ TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
+
+ Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
+
+ // Fully reduce the values and create output format VO
+ GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
+ new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+ }
+
+// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
+//
+// @Override
+// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) {
+// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput());
+//
+// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn();
+//
+// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
+//
+// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput());
+//
+// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
+//
+// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
+// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
+// context.setOutputDataSet(transform.getOutput(), outputDataSet);
+// }
+// }
+
+ private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
+
+ @Override
+ public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) {
+ DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+
+ final DoFn<IN, OUT> doFn = transform.getFn();
+
+ TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform));
+
+ FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
+ MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+ }
+
+ private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
+ private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
+
+ @Override
+ public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) {
+ DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
+
+ final DoFn<IN, OUT> doFn = transform.getFn();
+
+ Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+
+ Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+ // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this
+ outputMap.put(transform.getMainOutputTag(), 0);
+ int count = 1;
+ for (TupleTag<?> tag: outputs.keySet()) {
+ if (!outputMap.containsKey(tag)) {
+ outputMap.put(tag, count++);
+ }
+ }
+
+ // collect all output Coders and create a UnionCoder for our tagged outputs
+ List<Coder<?>> outputCoders = Lists.newArrayList();
+ for (PCollection<?> coll: outputs.values()) {
+ outputCoders.add(coll.getCoder());
+ }
+
+ UnionCoder unionCoder = UnionCoder.of(outputCoders);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder);
+
+ @SuppressWarnings("unchecked")
+ FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
+ MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
+
+ transformSideInputs(transform.getSideInputs(), outputDataSet, context);
+
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+ TypeInformation<Object> outputType = context.getTypeInfo(output.getValue());
+ int outputTag = outputMap.get(output.getKey());
+ FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag);
+ FlatMapOperator<RawUnionValue, Object> pruningOperator = new
+ FlatMapOperator<>(outputDataSet, outputType,
+ pruningFunction, output.getValue().getName());
+ context.setOutputDataSet(output.getValue(), pruningOperator);
+
+ }
+ }
+ }
+
+ private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+
+ @Override
+ public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) {
+ List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+ DataSet<T> result = null;
+ for(PCollection<T> collection : allInputs) {
+ DataSet<T> current = context.getInputDataSet(collection);
+ if (result == null) {
+ result = current;
+ } else {
+ result = result.union(current);
+ }
+ }
+ context.setOutputDataSet(context.getOutput(transform), result);
+ }
+ }
+
+ private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> {
+ @Override
+ public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) {
+ DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
+ PCollectionView<T> input = transform.apply(null);
+ context.setSideInputDataSet(input, inputDataSet);
+ }
+ }
+
+ private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
+
+ @Override
+ public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) {
+ TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
+ Iterable<OUT> elements = transform.getElements();
+
+ // we need to serialize the elements to byte arrays, since they might contain
+ // elements that are not serializable by Java serialization. We deserialize them
+ // in the FlatMap function using the Coder.
+
+ List<byte[]> serializedElements = Lists.newArrayList();
+ Coder<OUT> coder = context.getOutput(transform).getCoder();
+ for (OUT element: elements) {
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ try {
+ coder.encode(element, bao, Coder.Context.OUTER);
+ serializedElements.add(bao.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
+ }
+ }
+
+ DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
+ FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder);
+ FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName());
+
+ context.setOutputDataSet(context.getOutput(transform), outputDataSet);
+ }
+ }
+
+ private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
+ MapPartitionOperator<?, ?> outputDataSet,
+ FlinkBatchTranslationContext context) {
+ // get corresponding Flink broadcast DataSets
+ for(PCollectionView<?> input : sideInputs) {
+ DataSet<?> broadcastSet = context.getSideInputDataSet(input);
+ outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
+ }
+ }
+
+// Disabled because it depends on a pending pull request to the DataFlowSDK
+ /**
+ * Special composite transform translator. Only called if the CoGroup is two dimensional.
+ * @param <K>
+ */
+ private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
+
+ @Override
+ public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) {
+ KeyedPCollectionTuple<K> input = context.getInput(transform);
+
+ CoGbkResultSchema schema = input.getCoGbkResultSchema();
+ List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections();
+
+ KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0);
+ KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1);
+
+ TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
+ TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
+
+ PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection();
+ PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection();
+
+ DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
+ DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
+
+ TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo();
+
+ FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
+
+ Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
+ Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
+
+ DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2,
+ keySelector1, keySelector2,
+ aggregator, typeInfo, null, transform.getName());
+ context.setOutputDataSet(context.getOutput(transform), out);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous
+ // --------------------------------------------------------------------------------------------
+
+ private FlinkBatchTransformTranslators() {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
new file mode 100644
index 0000000..1072fa3
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.types.KvCoderTypeInformation;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.cloud.dataflow.sdk.values.TypedPValue;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkBatchTranslationContext {
+
+ private final Map<PValue, DataSet<?>> dataSets;
+ private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
+
+ private final ExecutionEnvironment env;
+ private final PipelineOptions options;
+
+ private AppliedPTransform<?, ?, ?> currentTransform;
+
+ // ------------------------------------------------------------------------
+
+ public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
+ this.env = env;
+ this.options = options;
+ this.dataSets = new HashMap<>();
+ this.broadcastDataSets = new HashMap<>();
+ }
+
+ // ------------------------------------------------------------------------
+
+ public ExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataSet<T> getInputDataSet(PValue value) {
+ return (DataSet<T>) dataSets.get(value);
+ }
+
+ public void setOutputDataSet(PValue value, DataSet<?> set) {
+ if (!dataSets.containsKey(value)) {
+ dataSets.put(value, set);
+ }
+ }
+
+ /**
+ * Sets the AppliedPTransform which carries input/output.
+ * @param currentTransform
+ */
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
+ return (DataSet<T>) broadcastDataSets.get(value);
+ }
+
+ public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
+ if (!broadcastDataSets.containsKey(value)) {
+ broadcastDataSets.put(value, set);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> TypeInformation<T> getTypeInfo(PInput output) {
+ if (output instanceof TypedPValue) {
+ Coder<?> outputCoder = ((TypedPValue) output).getCoder();
+ if (outputCoder instanceof KvCoder) {
+ return new KvCoderTypeInformation((KvCoder) outputCoder);
+ } else {
+ return new CoderTypeInformation(outputCoder);
+ }
+ }
+ return new GenericTypeInfo<>((Class<T>)Object.class);
+ }
+
+ public <T> TypeInformation<T> getInputTypeInfo() {
+ return getTypeInfo(currentTransform.getInput());
+ }
+
+ public <T> TypeInformation<T> getOutputTypeInfo() {
+ return getTypeInfo((PValue) currentTransform.getOutput());
+ }
+
+ @SuppressWarnings("unchecked")
+ <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ return (I) currentTransform.getInput();
+ }
+
+ @SuppressWarnings("unchecked")
+ <O extends POutput> O getOutput(PTransform<?, O> transform) {
+ return (O) currentTransform.getOutput();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
index 92b9135..e5c8545 100644
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
@@ -7,8 +7,6 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- *
- *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,151 +16,10 @@
package com.dataartisans.flink.dataflow.translation;
import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * FlinkPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
- *
- * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
- */
-public class FlinkPipelineTranslator implements PipelineVisitor {
-
- private final TranslationContext context;
-
- private int depth = 0;
-
- /**
- * Composite transform that we want to translate before proceeding with other transforms
- */
- private PTransform<?, ?> currentCompositeTransform;
-
- public FlinkPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
- this.context = new TranslationContext(env, options);
- }
+public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor {
public void translate(Pipeline pipeline) {
pipeline.traverseTopologically(this);
}
-
-
- // --------------------------------------------------------------------------------------------
- // Pipeline Visitor Methods
- // --------------------------------------------------------------------------------------------
-
- private static String genSpaces(int n) {
- String s = "";
- for(int i = 0; i < n; i++) {
- s += "| ";
- }
- return s;
- }
-
- private static String formatNodeName(TransformTreeNode node) {
- return node.toString().split("@")[1] + node.getTransform();
- }
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
- PTransform<?, ?> transform = node.getTransform();
-
- if (transform != null && currentCompositeTransform == null) {
- TransformTranslator<?> translator = FlinkTransformTranslators.getTranslator(transform);
-
- if (translator != null) {
- currentCompositeTransform = transform;
-
- if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
- // we can only optimize CoGroupByKey for input size 2
- currentCompositeTransform = null;
- }
- }
- }
-
- this.depth++;
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
-
- if (transform != null) {
- TransformTranslator<?> translator = FlinkTransformTranslators.getTranslator(transform);
-
- if (currentCompositeTransform == transform) {
- if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
- applyTransform(transform, node, translator);
- currentCompositeTransform = null;
- } else {
- throw new IllegalStateException("Attempted to translate composite transform " +
- "but no translator was found: " + currentCompositeTransform);
- }
- }
- }
-
- this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
- if (currentCompositeTransform != null) {
- // ignore it
- return;
- }
-
- // the transformation applied in this node
- PTransform<?, ?> transform = node.getTransform();
-
- // the translator to the Flink operation(s)
- TransformTranslator<?> translator = FlinkTransformTranslators.getTranslator(transform);
-
- if (translator == null) {
- System.out.println(node.getTransform().getClass());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
- }
-
- applyTransform(transform, node, translator);
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- // do nothing here
- }
-
- /**
- * Utility method to define a generic variable to cast the translator and the transform to.
- */
- private <T extends PTransform<?, ?>> void applyTransform(PTransform<?, ?> transform, TransformTreeNode node, TransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
- // create the applied PTransform on the context
- context.setCurrentTransform(AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
-
- typedTranslator.translateNode(typedTransform, context);
- }
-
- /**
- * A translator of a {@link PTransform}.
- */
- public interface TransformTranslator<Type extends PTransform> {
-
- void translateNode(Type transform, TranslationContext context);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
new file mode 100644
index 0000000..c8760c7
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
+
+ /** The necessary context in the case of a straming job. */
+ private final FlinkStreamingTranslationContext streamingContext;
+
+ private int depth = 0;
+
+ /** Composite transform that we want to translate before proceeding with other transforms. */
+ private PTransform<?, ?> currentCompositeTransform;
+
+ public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
+ this.streamingContext = new FlinkStreamingTranslationContext(env, options);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Pipeline Visitor Methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void enterCompositeTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == null) {
+
+ StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ currentCompositeTransform = transform;
+ }
+ }
+ this.depth++;
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null && currentCompositeTransform == transform) {
+
+ StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
+ if (translator != null) {
+ System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ applyStreamingTransform(transform, node, translator);
+ currentCompositeTransform = null;
+ } else {
+ throw new IllegalStateException("Attempted to translate composite transform " +
+ "but no translator was found: " + currentCompositeTransform);
+ }
+ }
+ this.depth--;
+ System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ }
+
+ @Override
+ public void visitTransform(TransformTreeNode node) {
+ System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ if (currentCompositeTransform != null) {
+ // ignore it
+ return;
+ }
+
+ // get the transformation corresponding to hte node we are
+ // currently visiting and translate it into its Flink alternative.
+
+ PTransform<?, ?> transform = node.getTransform();
+ StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
+ if (translator == null) {
+ System.out.println(node.getTransform().getClass());
+ throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ }
+ applyStreamingTransform(transform, node, translator);
+ }
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ // do nothing here
+ }
+
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) {
+ if (this.streamingContext == null) {
+ throw new IllegalStateException("The FlinkPipelineTranslator is not yet initialized.");
+ }
+
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+
+ @SuppressWarnings("unchecked")
+ StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
+
+ // create the applied PTransform on the batchContext
+ streamingContext.setCurrentTransform(AppliedPTransform.of(
+ node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
+ typedTranslator.translateNode(typedTransform, streamingContext);
+ }
+
+ /**
+ * A translator of a {@link PTransform}.
+ */
+ public interface StreamTransformTranslator<Type extends PTransform> {
+ void translateNode(Type transform, FlinkStreamingTranslationContext context);
+ }
+
+ private static String genSpaces(int n) {
+ String s = "";
+ for (int i = 0; i < n; i++) {
+ s += "| ";
+ }
+ return s;
+ }
+
+ private static String formatNodeName(TransformTreeNode node) {
+ return node.toString().split("@")[1] + node.getTransform();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
new file mode 100644
index 0000000..4c8cd4b
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -0,0 +1,356 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.dataartisans.flink.dataflow.translation.functions.UnionCoder;
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.*;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import com.google.api.client.util.Maps;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.*;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.collect.Lists;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * <p>
+ * Coder<?> entryCoder = pCollection.getCoder();
+ * if (!(entryCoder instanceof KvCoder<?, ?>)) {
+ * throw new IllegalArgumentException("PCollection does not use a KvCoder");
+ * }
+ */
+public class FlinkStreamingTransformTranslators {
+
+ // --------------------------------------------------------------------------------------------
+ // Transform Translator Registry
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("rawtypes")
+ private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
+
+ // here you can find all the available translators.
+ static {
+ TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+ TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+ TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+ TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+ TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+ TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+ TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
+
+ }
+
+ public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+ return translator;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Transformation Implementations
+ // --------------------------------------------------------------------------------------------
+
+ private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> {
+ private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+
+ @Override
+ public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+ PValue input = context.getInput(transform);
+ DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
+
+ String filenamePrefix = transform.getFilenamePrefix();
+ String filenameSuffix = transform.getFilenameSuffix();
+ boolean needsValidation = transform.needsValidation();
+ int numShards = transform.getNumShards();
+ String shardNameTemplate = transform.getShardNameTemplate();
+
+ // TODO: Implement these. We need Flink support for this.
+ LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
+ LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
+ LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
+
+ DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
+ @Override
+ public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
+ out.collect(value.getValue().toString());
+ }
+ });
+ DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+
+ if (numShards > 0) {
+ output.setParallelism(numShards);
+ }
+ }
+ }
+
+ private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+
+ @Override
+ public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
+ PCollection<T> output = context.getOutput(transform);
+
+ DataStream<WindowedValue<T>> source = null;
+ if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
+ UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
+ source = context.getExecutionEnvironment()
+ .addSource(flinkSource.getFlinkSource())
+ .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
+ @Override
+ public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
+ collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ }
+ });
+ } else {
+ source = context.getExecutionEnvironment()
+ .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
+ }
+ context.setOutputDataStream(output, source);
+ }
+ }
+
+ private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> {
+
+ @Override
+ public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) {
+ PCollection<OUT> output = context.getOutput(transform);
+
+ final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy =
+ (WindowingStrategy<OUT, ? extends BoundedWindow>)
+ context.getOutput(transform).getWindowingStrategy();
+
+ WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(), windowingStrategy.getWindowFn().windowCoder());
+ CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder = new CoderTypeInformation<>(outputStreamCoder);
+
+ FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(context.getPipelineOptions(), windowingStrategy, transform.getFn());
+ DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
+ SingleOutputStreamOperator<WindowedValue<OUT>, ?> outDataStream = inputDataStream.flatMap(doFnWrapper).returns(outputWindowedValueCoder);
+
+ context.setOutputDataStream(context.getOutput(transform), outDataStream);
+ }
+ }
+
+ public static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+
+ @Override
+ public void translateNode(Window.Bound<T> transform, FlinkStreamingTranslationContext context) {
+ PValue input = context.getInput(transform);
+ DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
+
+ final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+ (WindowingStrategy<T, ? extends BoundedWindow>)
+ context.getOutput(transform).getWindowingStrategy();
+
+ final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
+
+ WindowedValue.WindowedValueCoder<T> outputStreamCoder = WindowedValue.getFullCoder(
+ context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder());
+ CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder =
+ new CoderTypeInformation<>(outputStreamCoder);
+
+ final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new FlinkParDoBoundWrapper<>(
+ context.getPipelineOptions(), windowingStrategy, createWindowAssigner(windowFn));
+
+ SingleOutputStreamOperator<WindowedValue<T>, ?> windowedStream =
+ inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
+ context.setOutputDataStream(context.getOutput(transform), windowedStream);
+ }
+
+ private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
+ return new DoFn<T, T>() {
+
+ @Override
+ public void processElement(final ProcessContext c) throws Exception {
+ Collection<W> windows = windowFn.assignWindows(
+ windowFn.new AssignContext() {
+ @Override
+ public T element() {
+ return c.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return c.timestamp();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return c.windowingInternals().windows();
+ }
+ });
+
+ c.windowingInternals().outputWindowedValue(
+ c.element(), c.timestamp(), windows, c.pane());
+ }
+ };
+ }
+ }
+
+ public static class GroupByKeyTranslator<K, V> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
+
+ @Override
+ public void translateNode(GroupByKey<K, V> transform, FlinkStreamingTranslationContext context) {
+ PValue input = context.getInput(transform);
+
+ DataStream<WindowedValue<KV<K, V>>> inputDataStream = context.getInputDataStream(input);
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+
+ KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = FlinkGroupByKeyWrapper
+ .groupStreamByKey(inputDataStream, inputKvCoder);
+
+ DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream =
+ FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
+ context.getInput(transform), groupByKStream);
+
+ context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
+ }
+ }
+
+ public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, VIN, VOUT>> {
+
+ @Override
+ public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, FlinkStreamingTranslationContext context) {
+ PValue input = context.getInput(transform);
+
+ DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = context.getInputDataStream(input);
+ KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) context.getInput(transform).getCoder();
+ KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) context.getOutput(transform).getCoder();
+
+ KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = FlinkGroupByKeyWrapper
+ .groupStreamByKey(inputDataStream, inputKvCoder);
+
+ Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = (Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
+ DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream =
+ FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
+ context.getInput(transform), groupByKStream, combineFn, outputKvCoder);
+
+ context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
+ }
+ }
+
+ public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> {
+
+ @Override
+ public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) {
+ List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+ DataStream<T> result = null;
+ for (PCollection<T> collection : allInputs) {
+ DataStream<T> current = context.getInputDataStream(collection);
+ result = (result == null) ? current : result.union(current);
+ }
+ context.setOutputDataStream(context.getOutput(transform), result);
+ }
+ }
+
+ public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
+
+ private final int MAIN_TAG_INDEX = 0;
+
+ @Override
+ public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkStreamingTranslationContext context) {
+
+ // we assume that the transformation does not change the windowing strategy.
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = context.getInput(transform).getWindowingStrategy();
+
+ Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+ Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels(
+ transform.getMainOutputTag(), outputs.keySet());
+
+ UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values());
+ WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = WindowedValue.getFullCoder(
+ intermUnionCoder, windowingStrategy.getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<RawUnionValue>> intermWindowedValueCoder =
+ new CoderTypeInformation<>(outputStreamCoder);
+
+ FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundMultiWrapper<>(
+ context.getPipelineOptions(), windowingStrategy, transform.getFn(),
+ transform.getMainOutputTag(), tagsToLabels);
+
+ DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
+ SingleOutputStreamOperator<WindowedValue<RawUnionValue>, ?> intermDataStream =
+ inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
+
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+ final int outputTag = tagsToLabels.get(output.getKey());
+
+ WindowedValue.WindowedValueCoder<?> coderForTag = WindowedValue.getFullCoder(
+ output.getValue().getCoder(),
+ windowingStrategy.getWindowFn().windowCoder());
+
+ CoderTypeInformation<WindowedValue<?>> windowedValueCoder =
+ new CoderTypeInformation(coderForTag);
+
+ context.setOutputDataStream(output.getValue(),
+ intermDataStream.filter(new FilterFunction<WindowedValue<RawUnionValue>>() {
+ @Override
+ public boolean filter(WindowedValue<RawUnionValue> value) throws Exception {
+ return value.getValue().getUnionTag() == outputTag;
+ }
+ }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
+ @Override
+ public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception {
+ collector.collect(WindowedValue.of(
+ value.getValue().getValue(),
+ value.getTimestamp(),
+ value.getWindows(),
+ value.getPane()));
+ }
+ }).returns(windowedValueCoder));
+ }
+ }
+
+ private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) {
+ Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+ tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
+ int count = MAIN_TAG_INDEX + 1;
+ for (TupleTag<?> tag : secondaryTags) {
+ if (!tagToLabelMap.containsKey(tag)) {
+ tagToLabelMap.put(tag, count++);
+ }
+ }
+ return tagToLabelMap;
+ }
+
+ private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) {
+ List<Coder<?>> outputCoders = Lists.newArrayList();
+ for (PCollection<?> coll : taggedCollections) {
+ outputCoders.add(coll.getCoder());
+ }
+ return UnionCoder.of(outputCoders);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
new file mode 100644
index 0000000..83ea575
--- /dev/null
+++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.*;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkStreamingTranslationContext {
+
+ private final StreamExecutionEnvironment env;
+ private final PipelineOptions options;
+
+ /**
+ * Keeps a mapping between the output value of the PTransform (in Dataflow) and the
+ * Flink Operator that produced it, after the translation of the correspondinf PTransform
+ * to its Flink equivalent.
+ * */
+ private final Map<PValue, DataStream<?>> dataStreams;
+
+ private AppliedPTransform<?, ?, ?> currentTransform;
+
+ public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) {
+ this.env = env;
+ this.options = options;
+ this.dataStreams = new HashMap<>();
+ }
+
+ public StreamExecutionEnvironment getExecutionEnvironment() {
+ return env;
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> DataStream<T> getInputDataStream(PValue value) {
+ return (DataStream<T>) dataStreams.get(value);
+ }
+
+ public void setOutputDataStream(PValue value, DataStream<?> set) {
+ if (!dataStreams.containsKey(value)) {
+ dataStreams.put(value, set);
+ }
+ }
+
+ /**
+ * Sets the AppliedPTransform which carries input/output.
+ * @param currentTransform
+ */
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
+ this.currentTransform = currentTransform;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <I extends PInput> I getInput(PTransform<I, ?> transform) {
+ I input = (I) currentTransform.getInput();
+ return input;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <O extends POutput> O getOutput(PTransform<?, O> transform) {
+ O output = (O) currentTransform.getOutput();
+ return output;
+ }
+}