You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:25 UTC
[30/50] [abbrv] incubator-beam git commit: [flink] adjust directories
according to package name
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
deleted file mode 100644
index fa0c8e9..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.joda.time.Duration;
-
-import java.util.Properties;
-
-public class KafkaWindowedWordCountExample {
-
- static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from
- static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
- static final String GROUP_ID = "myGroup"; // Default groupId
- static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
-
- public static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
- @Override
- public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
- System.out.println(row);
- c.output(row);
- }
- }
-
- public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions {
- @Description("The Kafka topic to read from")
- @Default.String(KAFKA_TOPIC)
- String getKafkaTopic();
-
- void setKafkaTopic(String value);
-
- @Description("The Kafka Broker to read from")
- @Default.String(KAFKA_BROKER)
- String getBroker();
-
- void setBroker(String value);
-
- @Description("The Zookeeper server to connect to")
- @Default.String(ZOOKEEPER)
- String getZookeeper();
-
- void setZookeeper(String value);
-
- @Description("The groupId")
- @Default.String(GROUP_ID)
- String getGroup();
-
- void setGroup(String value);
-
- }
-
- public static void main(String[] args) {
- PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
- KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
- options.setJobName("KafkaExample");
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkPipelineRunner.class);
-
- System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );
- Pipeline pipeline = Pipeline.create(options);
-
- Properties p = new Properties();
- p.setProperty("zookeeper.connect", options.getZookeeper());
- p.setProperty("bootstrap.servers", options.getBroker());
- p.setProperty("group.id", options.getGroup());
-
- // this is the Flink consumer that reads the input to
- // the program from a kafka topic.
- FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
- options.getKafkaTopic(),
- new SimpleStringSchema(), p);
-
- PCollection<String> words = pipeline
- .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- wordCounts.apply(ParDo.of(new FormatAsStringFn()))
- .apply(TextIO.Write.to("./outputKafka.txt"));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
deleted file mode 100644
index 6af044d..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.*;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * To run the example, first open a socket on a terminal by executing the command:
- * <li>
- * <li>
- * <code>nc -lk 9999</code>
- * </li>
- * </li>
- * and then launch the example. Now whatever you type in the terminal is going to be
- * the input to the program.
- * */
-public class WindowedWordCount {
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class);
-
- static final long WINDOW_SIZE = 10; // Default window duration in seconds
- static final long SLIDE_SIZE = 5; // Default window slide in seconds
-
- static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
- @Override
- public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString();
- c.output(row);
- }
- }
-
- static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", new Sum.SumLongFn());
-
- @Override
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options {
- @Description("Sliding window duration, in seconds")
- @Default.Long(WINDOW_SIZE)
- Long getWindowSize();
-
- void setWindowSize(Long value);
-
- @Description("Window slide, in seconds")
- @Default.Long(SLIDE_SIZE)
- Long getSlide();
-
- void setSlide(Long value);
- }
-
- public static void main(String[] args) throws IOException {
- StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class);
- options.setStreaming(true);
- options.setWindowSize(10L);
- options.setSlide(5L);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkPipelineRunner.class);
-
- LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() +
- " sec. and a slide of " + options.getSlide());
-
- Pipeline pipeline = Pipeline.create(options);
-
- PCollection<String> words = pipeline
- .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
- .every(Duration.standardSeconds(options.getSlide())))
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- wordCounts.apply(ParDo.of(new FormatAsStringFn()))
- .apply(TextIO.Write.to("./outputWordCount.txt"));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
deleted file mode 100644
index cd25ba3..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.io;
-
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-/**
- * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}.
- * to standard output.
- *
- * This is Flink-specific and will only work when executed using the
- * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}.
- */
-public class ConsoleIO {
-
- /**
- * A PTransform that writes a PCollection to a standard output.
- */
- public static class Write {
-
- /**
- * Returns a ConsoleIO.Write PTransform with a default step name.
- */
- public static Bound create() {
- return new Bound();
- }
-
- /**
- * Returns a ConsoleIO.Write PTransform with the given step name.
- */
- public static Bound named(String name) {
- return new Bound().named(name);
- }
-
- /**
- * A PTransform that writes a bounded PCollection to standard output.
- */
- public static class Bound extends PTransform<PCollection<?>, PDone> {
- private static final long serialVersionUID = 0;
-
- Bound() {
- super("ConsoleIO.Write");
- }
-
- Bound(String name) {
- super(name);
- }
-
- /**
- * Returns a new ConsoleIO.Write PTransform that's like this one but with the given
- * step
- * name. Does not modify this object.
- */
- public Bound named(String name) {
- return new Bound(name);
- }
-
- @Override
- public PDone apply(PCollection<?> input) {
- return PDone.in(input.getPipeline());
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
deleted file mode 100644
index 5201423..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
- * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
- */
-public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
-
- /**
- * The necessary context in the case of a batch job.
- */
- private final FlinkBatchTranslationContext batchContext;
-
- private int depth = 0;
-
- /**
- * Composite transform that we want to translate before proceeding with other transforms.
- */
- private PTransform<?, ?> currentCompositeTransform;
-
- public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) {
- this.batchContext = new FlinkBatchTranslationContext(env, options);
- }
-
- // --------------------------------------------------------------------------------------------
- // Pipeline Visitor Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
-
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null && currentCompositeTransform == null) {
-
- BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
- if (translator != null) {
- currentCompositeTransform = transform;
- if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) {
- // we can only optimize CoGroupByKey for input size 2
- currentCompositeTransform = null;
- }
- }
- }
- this.depth++;
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null && currentCompositeTransform == transform) {
-
- BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
- if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
- applyBatchTransform(transform, node, translator);
- currentCompositeTransform = null;
- } else {
- throw new IllegalStateException("Attempted to translate composite transform " +
- "but no translator was found: " + currentCompositeTransform);
- }
- }
- this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
- if (currentCompositeTransform != null) {
- // ignore it
- return;
- }
-
- // get the transformation corresponding to hte node we are
- // currently visiting and translate it into its Flink alternative.
-
- PTransform<?, ?> transform = node.getTransform();
- BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
- if (translator == null) {
- System.out.println(node.getTransform().getClass());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
- }
- applyBatchTransform(transform, node, translator);
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- // do nothing here
- }
-
- private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
-
- // create the applied PTransform on the batchContext
- batchContext.setCurrentTransform(AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
- typedTranslator.translateNode(typedTransform, batchContext);
- }
-
- /**
- * A translator of a {@link PTransform}.
- */
- public interface BatchTransformTranslator<Type extends PTransform> {
- void translateNode(Type transform, FlinkBatchTranslationContext context);
- }
-
- private static String genSpaces(int n) {
- String s = "";
- for (int i = 0; i < n; i++) {
- s += "| ";
- }
- return s;
- }
-
- private static String formatNodeName(TransformTreeNode node) {
- return node.toString().split("@")[1] + node.getTransform();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
deleted file mode 100644
index f33e4f5..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.io.ConsoleIO;
-import org.apache.beam.runners.flink.translation.functions.FlinkCoGroupKeyedListAggregator;
-import org.apache.beam.runners.flink.translation.functions.FlinkCreateFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkKeyedListAggregationFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputDoFnFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.SinkOutputFormat;
-import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.BoundedSource;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.Write;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.DataSink;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.operators.GroupCombineOperator;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.MapPartitionOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Translators for transforming
- * Dataflow {@link com.google.cloud.dataflow.sdk.transforms.PTransform}s to
- * Flink {@link org.apache.flink.api.java.DataSet}s
- */
-public class FlinkBatchTransformTranslators {
-
- // --------------------------------------------------------------------------------------------
- // Transform Translator Registry
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("rawtypes")
- private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
-
- // register the known translators
- static {
- TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
-
- TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
- // we don't need this because we translate the Combine.PerKey directly
- //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator());
-
- TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch());
-
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
-
- TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
- // TODO we're currently ignoring windows here but that has to change in the future
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-
- TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
- TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
-
- TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch());
-
- TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch());
- TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch());
-
- TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
- TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch());
-
- TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch());
- TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch());
-
- // Flink-specific
- TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch());
-
- }
-
-
- public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
- return TRANSLATORS.get(transform.getClass());
- }
-
- private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> {
-
- @Override
- public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) {
- String name = transform.getName();
- BoundedSource<T> source = transform.getSource();
- PCollection<T> output = context.getOutput(transform);
- Coder<T> coder = output.getCoder();
-
- TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
- DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(),
- new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name);
-
- context.setOutputDataSet(output, dataSource);
- }
- }
-
- private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class);
-
- @Override
- public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) {
- String path = transform.getFilepattern();
- String name = transform.getName();
-// Schema schema = transform.getSchema();
- PValue output = context.getOutput(transform);
-
- TypeInformation<T> typeInformation = context.getTypeInfo(output);
-
- // This is super hacky, but unfortunately we cannot get the type otherwise
- Class<T> extractedAvroType;
- try {
- Field typeField = transform.getClass().getDeclaredField("type");
- typeField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Class<T> avroType = (Class<T>) typeField.get(transform);
- extractedAvroType = avroType;
- } catch (NoSuchFieldException | IllegalAccessException e) {
- // we know that the field is there and it is accessible
- throw new RuntimeException("Could not access type from AvroIO.Bound", e);
- }
-
- DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(),
- new AvroInputFormat<>(new Path(path), extractedAvroType),
- typeInformation, name);
-
- context.setOutputDataSet(output, source);
- }
- }
-
- private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class);
-
- @Override
- public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.",
- filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
-
- // This is super hacky, but unfortunately we cannot get the type otherwise
- Class<T> extractedAvroType;
- try {
- Field typeField = transform.getClass().getDeclaredField("type");
- typeField.setAccessible(true);
- @SuppressWarnings("unchecked")
- Class<T> avroType = (Class<T>) typeField.get(transform);
- extractedAvroType = avroType;
- } catch (NoSuchFieldException | IllegalAccessException e) {
- // we know that the field is there and it is accessible
- throw new RuntimeException("Could not access type from AvroIO.Bound", e);
- }
-
- DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path
- (filenamePrefix), extractedAvroType));
-
- if (numShards > 0) {
- dataSink.setParallelism(numShards);
- }
- }
- }
-
- private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class);
-
- @Override
- public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) {
- String path = transform.getFilepattern();
- String name = transform.getName();
-
- TextIO.CompressionType compressionType = transform.getCompressionType();
- boolean needsValidation = transform.needsValidation();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType);
- LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation);
-
- PValue output = context.getOutput(transform);
-
- TypeInformation<String> typeInformation = context.getTypeInfo(output);
- DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name);
-
- context.setOutputDataSet(output, source);
- }
- }
-
- private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class);
-
- @Override
- public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- PValue input = context.getInput(transform);
- DataSet<T> inputDataSet = context.getInputDataSet(input);
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- boolean needsValidation = transform.needsValidation();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
-
- //inputDataSet.print();
- DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix);
-
- if (numShards > 0) {
- dataSink.setParallelism(numShards);
- }
- }
- }
-
- private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> {
- @Override
- public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) {
- PValue input = context.getInput(transform);
- DataSet<?> inputDataSet = context.getInputDataSet(input);
- inputDataSet.printOnTaskManager(transform.getName());
- }
- }
-
- private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> {
-
- @Override
- public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) {
- String name = transform.getName();
- PValue input = context.getInput(transform);
- DataSet<T> inputDataSet = context.getInputDataSet(input);
-
- inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name);
- }
- }
-
- private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> {
-
- @Override
- public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
-
- TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
- Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
- GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
- /**
- * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch}
- */
- private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
-
- @Override
- public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
-
- TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
- Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
- GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
- private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> {
-
- @Override
- public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform));
-
- @SuppressWarnings("unchecked")
- Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn();
-
- KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder();
-
- Coder<VA> accumulatorCoder =
- null;
- try {
- accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder());
- } catch (CannotProvideCoderException e) {
- e.printStackTrace();
- // TODO
- }
-
- TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder);
- TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder));
-
- Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation));
-
- FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn);
-
- // Partially GroupReduce the values into the intermediate format VA (combine)
- GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine =
- new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction,
- "GroupCombine: " + transform.getName());
-
- // Reduce fully to VO
- GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn);
-
- TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
-
- Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType()));
-
- // Fully reduce the values and create output format VO
- GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet =
- new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName());
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
-// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> {
-//
-// @Override
-// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) {
-// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput());
-//
-// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn();
-//
-// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn);
-//
-// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput());
-//
-// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType()));
-//
-// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet =
-// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-// context.setOutputDataSet(transform.getOutput(), outputDataSet);
-// }
-// }
-
- private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
-
- @Override
- public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) {
- DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
-
- final DoFn<IN, OUT> doFn = transform.getFn();
-
- TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
- FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions());
- MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
-
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
- private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class);
-
- @Override
- public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) {
- DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform));
-
- final DoFn<IN, OUT> doFn = transform.getFn();
-
- Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
-
- Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
- // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this
- outputMap.put(transform.getMainOutputTag(), 0);
- int count = 1;
- for (TupleTag<?> tag: outputs.keySet()) {
- if (!outputMap.containsKey(tag)) {
- outputMap.put(tag, count++);
- }
- }
-
- // collect all output Coders and create a UnionCoder for our tagged outputs
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PCollection<?> coll: outputs.values()) {
- outputCoders.add(coll.getCoder());
- }
-
- UnionCoder unionCoder = UnionCoder.of(outputCoders);
-
- @SuppressWarnings("unchecked")
- TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder);
-
- @SuppressWarnings("unchecked")
- FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap);
- MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName());
-
- transformSideInputs(transform.getSideInputs(), outputDataSet, context);
-
- for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
- TypeInformation<Object> outputType = context.getTypeInfo(output.getValue());
- int outputTag = outputMap.get(output.getKey());
- FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag);
- FlatMapOperator<RawUnionValue, Object> pruningOperator = new
- FlatMapOperator<>(outputDataSet, outputType,
- pruningFunction, output.getValue().getName());
- context.setOutputDataSet(output.getValue(), pruningOperator);
-
- }
- }
- }
-
- private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> {
-
- @Override
- public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) {
- List<PCollection<T>> allInputs = context.getInput(transform).getAll();
- DataSet<T> result = null;
- for(PCollection<T> collection : allInputs) {
- DataSet<T> current = context.getInputDataSet(collection);
- if (result == null) {
- result = current;
- } else {
- result = result.union(current);
- }
- }
- context.setOutputDataSet(context.getOutput(transform), result);
- }
- }
-
- private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> {
- @Override
- public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) {
- DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform));
- PCollectionView<T> input = transform.apply(null);
- context.setSideInputDataSet(input, inputDataSet);
- }
- }
-
- private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> {
-
- @Override
- public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) {
- TypeInformation<OUT> typeInformation = context.getOutputTypeInfo();
- Iterable<OUT> elements = transform.getElements();
-
- // we need to serialize the elements to byte arrays, since they might contain
- // elements that are not serializable by Java serialization. We deserialize them
- // in the FlatMap function using the Coder.
-
- List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OUT> coder = context.getOutput(transform).getCoder();
- for (OUT element: elements) {
- ByteArrayOutputStream bao = new ByteArrayOutputStream();
- try {
- coder.encode(element, bao, Coder.Context.OUTER);
- serializedElements.add(bao.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
- }
- }
-
- DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
- FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder);
- FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName());
-
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
- private static void transformSideInputs(List<PCollectionView<?>> sideInputs,
- MapPartitionOperator<?, ?> outputDataSet,
- FlinkBatchTranslationContext context) {
- // get corresponding Flink broadcast DataSets
- for(PCollectionView<?> input : sideInputs) {
- DataSet<?> broadcastSet = context.getSideInputDataSet(input);
- outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
- }
- }
-
-// Disabled because it depends on a pending pull request to the DataFlowSDK
- /**
- * Special composite transform translator. Only called if the CoGroup is two dimensional.
- * @param <K>
- */
- private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> {
-
- @Override
- public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) {
- KeyedPCollectionTuple<K> input = context.getInput(transform);
-
- CoGbkResultSchema schema = input.getCoGbkResultSchema();
- List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections();
-
- KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0);
- KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1);
-
- TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag();
- TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag();
-
- PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection();
- PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection();
-
- DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1);
- DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2);
-
- TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo();
-
- FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2);
-
- Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType());
- Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType());
-
- DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2,
- keySelector1, keySelector2,
- aggregator, typeInfo, null, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), out);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
- private FlinkBatchTransformTranslators() {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
deleted file mode 100644
index fd99833..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.KvCoderTypeInformation;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TypedPValue;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkBatchTranslationContext {
-
- private final Map<PValue, DataSet<?>> dataSets;
- private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
-
- private final ExecutionEnvironment env;
- private final PipelineOptions options;
-
- private AppliedPTransform<?, ?, ?> currentTransform;
-
- // ------------------------------------------------------------------------
-
- public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) {
- this.env = env;
- this.options = options;
- this.dataSets = new HashMap<>();
- this.broadcastDataSets = new HashMap<>();
- }
-
- // ------------------------------------------------------------------------
-
- public ExecutionEnvironment getExecutionEnvironment() {
- return env;
- }
-
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @SuppressWarnings("unchecked")
- public <T> DataSet<T> getInputDataSet(PValue value) {
- return (DataSet<T>) dataSets.get(value);
- }
-
- public void setOutputDataSet(PValue value, DataSet<?> set) {
- if (!dataSets.containsKey(value)) {
- dataSets.put(value, set);
- }
- }
-
- /**
- * Sets the AppliedPTransform which carries input/output.
- * @param currentTransform
- */
- public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
- this.currentTransform = currentTransform;
- }
-
- @SuppressWarnings("unchecked")
- public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
- return (DataSet<T>) broadcastDataSets.get(value);
- }
-
- public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
- if (!broadcastDataSets.containsKey(value)) {
- broadcastDataSets.put(value, set);
- }
- }
-
- @SuppressWarnings("unchecked")
- public <T> TypeInformation<T> getTypeInfo(PInput output) {
- if (output instanceof TypedPValue) {
- Coder<?> outputCoder = ((TypedPValue) output).getCoder();
- if (outputCoder instanceof KvCoder) {
- return new KvCoderTypeInformation((KvCoder) outputCoder);
- } else {
- return new CoderTypeInformation(outputCoder);
- }
- }
- return new GenericTypeInfo<>((Class<T>)Object.class);
- }
-
- public <T> TypeInformation<T> getInputTypeInfo() {
- return getTypeInfo(currentTransform.getInput());
- }
-
- public <T> TypeInformation<T> getOutputTypeInfo() {
- return getTypeInfo((PValue) currentTransform.getOutput());
- }
-
- @SuppressWarnings("unchecked")
- <I extends PInput> I getInput(PTransform<I, ?> transform) {
- return (I) currentTransform.getInput();
- }
-
- @SuppressWarnings("unchecked")
- <O extends POutput> O getOutput(PTransform<?, O> transform) {
- return (O) currentTransform.getOutput();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
deleted file mode 100644
index efe217f..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-
-/**
- * The role of this class is to translate the Beam operators to
- * their Flink counterparts. If we have a streaming job, this is instantiated as a
- * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job,
- * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based user-provided job is translated into
- * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a
- * {@link org.apache.flink.api.java.DataSet} (for batch) one.
- */
-public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor {
-
- public void translate(Pipeline pipeline) {
- pipeline.traverseTopologically(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
deleted file mode 100644
index 21a8133..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
- * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based job into a
- * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator}
- * */
-public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
-
- /** The necessary context in the case of a straming job. */
- private final FlinkStreamingTranslationContext streamingContext;
-
- private int depth = 0;
-
- /** Composite transform that we want to translate before proceeding with other transforms. */
- private PTransform<?, ?> currentCompositeTransform;
-
- public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
- this.streamingContext = new FlinkStreamingTranslationContext(env, options);
- }
-
- // --------------------------------------------------------------------------------------------
- // Pipeline Visitor Methods
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
-
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null && currentCompositeTransform == null) {
-
- StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
- if (translator != null) {
- currentCompositeTransform = transform;
- }
- }
- this.depth++;
- }
-
- @Override
- public void leaveCompositeTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
- if (transform != null && currentCompositeTransform == transform) {
-
- StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
- if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
- applyStreamingTransform(transform, node, translator);
- currentCompositeTransform = null;
- } else {
- throw new IllegalStateException("Attempted to translate composite transform " +
- "but no translator was found: " + currentCompositeTransform);
- }
- }
- this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
- }
-
- @Override
- public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
- if (currentCompositeTransform != null) {
- // ignore it
- return;
- }
-
- // get the transformation corresponding to hte node we are
- // currently visiting and translate it into its Flink alternative.
-
- PTransform<?, ?> transform = node.getTransform();
- StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
- if (translator == null) {
- System.out.println(node.getTransform().getClass());
- throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
- }
- applyStreamingTransform(transform, node, translator);
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- // do nothing here
- }
-
- private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) {
-
- @SuppressWarnings("unchecked")
- T typedTransform = (T) transform;
-
- @SuppressWarnings("unchecked")
- StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
-
- // create the applied PTransform on the streamingContext
- streamingContext.setCurrentTransform(AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform));
- typedTranslator.translateNode(typedTransform, streamingContext);
- }
-
- /**
- * The interface that every Flink translator of a Beam operator should implement.
- * This interface is for <b>streaming</b> jobs. For examples of such translators see
- * {@link FlinkStreamingTransformTranslators}.
- */
- public interface StreamTransformTranslator<Type extends PTransform> {
- void translateNode(Type transform, FlinkStreamingTranslationContext context);
- }
-
- private static String genSpaces(int n) {
- String s = "";
- for (int i = 0; i < n; i++) {
- s += "| ";
- }
- return s;
- }
-
- private static String formatNodeName(TransformTreeNode node) {
- return node.toString().split("@")[1] + node.getTransform();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
deleted file mode 100644
index 1083848..0000000
--- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation;
-
-import org.apache.beam.runners.flink.translation.functions.UnionCoder;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.*;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
-import com.google.api.client.util.Maps;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.*;
-import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.streaming.api.datastream.*;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * This class contains all the mappings between Beam and Flink
- * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator}
- * traverses the Beam job and comes here to translate the encountered Beam transformations
- * into Flink one, based on the mapping available in this class.
- */
-public class FlinkStreamingTransformTranslators {
-
- // --------------------------------------------------------------------------------------------
- // Transform Translator Registry
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("rawtypes")
- private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>();
-
- // here you can find all the available translators.
- static {
- TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
- TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
- TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
- TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
- TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
- TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
- TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
- }
-
- public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
- return TRANSLATORS.get(transform.getClass());
- }
-
- // --------------------------------------------------------------------------------------------
- // Transformation Implementations
- // --------------------------------------------------------------------------------------------
-
- private static class CreateStreamingTranslator<OUT> implements
- FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
-
- @Override
- public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) {
- PCollection<OUT> output = context.getOutput(transform);
- Iterable<OUT> elements = transform.getElements();
-
- // we need to serialize the elements to byte arrays, since they might contain
- // elements that are not serializable by Java serialization. We deserialize them
- // in the FlatMap function using the Coder.
-
- List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
- for (OUT element: elements) {
- ByteArrayOutputStream bao = new ByteArrayOutputStream();
- try {
- elementCoder.encode(element, bao, Coder.Context.OUTER);
- serializedElements.add(bao.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException("Could not serialize Create elements using Coder: " + e);
- }
- }
-
-
- DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1);
-
- FlinkStreamingCreateFunction<Integer, OUT> createFunction =
- new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
-
- WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder);
- TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder);
-
- DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
- .returns(outputType);
-
- context.setOutputDataStream(context.getOutput(transform), outputDataStream);
- }
- }
-
-
- private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
- @Override
- public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
- DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
-
- String filenamePrefix = transform.getFilenamePrefix();
- String filenameSuffix = transform.getFilenameSuffix();
- boolean needsValidation = transform.needsValidation();
- int numShards = transform.getNumShards();
- String shardNameTemplate = transform.getShardNameTemplate();
-
- // TODO: Implement these. We need Flink support for this.
- LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation);
- LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix);
- LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate);
-
- DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
- @Override
- public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception {
- out.collect(value.getValue().toString());
- }
- });
- DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
- if (numShards > 0) {
- output.setParallelism(numShards);
- }
- }
- }
-
- private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
-
- @Override
- public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) {
- PCollection<T> output = context.getOutput(transform);
-
- DataStream<WindowedValue<T>> source;
- if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
- UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
- source = context.getExecutionEnvironment()
- .addSource(flinkSource.getFlinkSource())
- .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
- @Override
- public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
- collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
- }
- });
- } else {
- source = context.getExecutionEnvironment()
- .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
- }
- context.setOutputDataStream(output, source);
- }
- }
-
- private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> {
-
- @Override
- public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) {
- PCollection<OUT> output = context.getOutput(transform);
-
- final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy =
- (WindowingStrategy<OUT, ? extends BoundedWindow>)
- context.getOutput(transform).getWindowingStrategy();
-
- WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(),
- windowingStrategy.getWindowFn().windowCoder());
- CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder =
- new CoderTypeInformation<>(outputStreamCoder);
-
- FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(
- context.getPipelineOptions(), windowingStrategy, transform.getFn());
- DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper)
- .returns(outputWindowedValueCoder);
-
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
- }
- }
-
- public static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
-
- @Override
- public void translateNode(Window.Bound<T> transform, FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
- DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input);
-
- final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
- (WindowingStrategy<T, ? extends BoundedWindow>)
- context.getOutput(transform).getWindowingStrategy();
-
- final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
-
- WindowedValue.WindowedValueCoder<T> outputStreamCoder = WindowedValue.getFullCoder(
- context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder());
- CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder =
- new CoderTypeInformation<>(outputStreamCoder);
-
- final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new FlinkParDoBoundWrapper<>(
- context.getPipelineOptions(), windowingStrategy, createWindowAssigner(windowFn));
-
- SingleOutputStreamOperator<WindowedValue<T>> windowedStream =
- inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
- context.setOutputDataStream(context.getOutput(transform), windowedStream);
- }
-
- private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) {
- return new DoFn<T, T>() {
-
- @Override
- public void processElement(final ProcessContext c) throws Exception {
- Collection<W> windows = windowFn.assignWindows(
- windowFn.new AssignContext() {
- @Override
- public T element() {
- return c.element();
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return c.windowingInternals().windows();
- }
- });
-
- c.windowingInternals().outputWindowedValue(
- c.element(), c.timestamp(), windows, c.pane());
- }
- };
- }
- }
-
- public static class GroupByKeyTranslator<K, V> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
-
- @Override
- public void translateNode(GroupByKey<K, V> transform, FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
-
- DataStream<WindowedValue<KV<K, V>>> inputDataStream = context.getInputDataStream(input);
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) context.getInput(transform).getCoder();
-
- KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = FlinkGroupByKeyWrapper
- .groupStreamByKey(inputDataStream, inputKvCoder);
-
- DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream =
- FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
- context.getInput(transform), groupByKStream);
-
- context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
- }
- }
-
- public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, VIN, VOUT>> {
-
- @Override
- public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, FlinkStreamingTranslationContext context) {
- PValue input = context.getInput(transform);
-
- DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = context.getInputDataStream(input);
- KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) context.getInput(transform).getCoder();
- KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) context.getOutput(transform).getCoder();
-
- KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = FlinkGroupByKeyWrapper
- .groupStreamByKey(inputDataStream, inputKvCoder);
-
- Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = (Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
- DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream =
- FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
- context.getInput(transform), groupByKStream, combineFn, outputKvCoder);
-
- context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
- }
- }
-
- public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> {
-
- @Override
- public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) {
- List<PCollection<T>> allInputs = context.getInput(transform).getAll();
- DataStream<T> result = null;
- for (PCollection<T> collection : allInputs) {
- DataStream<T> current = context.getInputDataStream(collection);
- result = (result == null) ? current : result.union(current);
- }
- context.setOutputDataStream(context.getOutput(transform), result);
- }
- }
-
- public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, OUT>> {
-
- private final int MAIN_TAG_INDEX = 0;
-
- @Override
- public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkStreamingTranslationContext context) {
-
- // we assume that the transformation does not change the windowing strategy.
- WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = context.getInput(transform).getWindowingStrategy();
-
- Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
- Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels(
- transform.getMainOutputTag(), outputs.keySet());
-
- UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values());
- WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = WindowedValue.getFullCoder(
- intermUnionCoder, windowingStrategy.getWindowFn().windowCoder());
-
- CoderTypeInformation<WindowedValue<RawUnionValue>> intermWindowedValueCoder =
- new CoderTypeInformation<>(outputStreamCoder);
-
- FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundMultiWrapper<>(
- context.getPipelineOptions(), windowingStrategy, transform.getFn(),
- transform.getMainOutputTag(), tagsToLabels);
-
- DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<RawUnionValue>> intermDataStream =
- inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
-
- for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
- final int outputTag = tagsToLabels.get(output.getKey());
-
- WindowedValue.WindowedValueCoder<?> coderForTag = WindowedValue.getFullCoder(
- output.getValue().getCoder(),
- windowingStrategy.getWindowFn().windowCoder());
-
- CoderTypeInformation<WindowedValue<?>> windowedValueCoder =
- new CoderTypeInformation(coderForTag);
-
- context.setOutputDataStream(output.getValue(),
- intermDataStream.filter(new FilterFunction<WindowedValue<RawUnionValue>>() {
- @Override
- public boolean filter(WindowedValue<RawUnionValue> value) throws Exception {
- return value.getValue().getUnionTag() == outputTag;
- }
- }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
- @Override
- public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception {
- collector.collect(WindowedValue.of(
- value.getValue().getValue(),
- value.getTimestamp(),
- value.getWindows(),
- value.getPane()));
- }
- }).returns(windowedValueCoder));
- }
- }
-
- private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) {
- Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
- tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
- int count = MAIN_TAG_INDEX + 1;
- for (TupleTag<?> tag : secondaryTags) {
- if (!tagToLabelMap.containsKey(tag)) {
- tagToLabelMap.put(tag, count++);
- }
- }
- return tagToLabelMap;
- }
-
- private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) {
- List<Coder<?>> outputCoders = Lists.newArrayList();
- for (PCollection<?> coll : taggedCollections) {
- outputCoders.add(coll.getCoder());
- }
- return UnionCoder.of(outputCoders);
- }
- }
-}