You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:42 UTC
[47/50] [abbrv] beam git commit: [BEAM-1993] Remove special unbounded
Flink source/sink
[BEAM-1993] Remove special unbounded Flink source/sink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8213fa6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8213fa6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8213fa6
Branch: refs/heads/jstorm-runner
Commit: d8213fa6b274cd6acbf4da6deffd21ca23fd7f42
Parents: fac4f3e
Author: Isma�l Mej�a <ie...@apache.org>
Authored: Tue Apr 18 16:03:11 2017 +0200
Committer: Isma�l Mej�a <ie...@apache.org>
Committed: Tue Apr 18 16:15:09 2017 +0200
----------------------------------------------------------------------
.../examples/streaming/KafkaIOExamples.java | 338 -------------------
.../KafkaWindowedWordCountExample.java | 164 ---------
.../FlinkStreamingTransformTranslators.java | 87 +----
.../flink/translation/types/FlinkCoder.java | 63 ----
.../streaming/io/UnboundedFlinkSink.java | 200 -----------
.../streaming/io/UnboundedFlinkSource.java | 120 -------
6 files changed, 12 insertions(+), 960 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
deleted file mode 100644
index 616e276..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Properties;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-/**
- * Recipes/Examples that demonstrate how to read/write data from/to Kafka.
- */
-public class KafkaIOExamples {
-
-
- private static final String KAFKA_TOPIC = "input"; // Default kafka topic to read from
- private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from
- private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
- private static final String GROUP_ID = "myGroup"; // Default groupId
- private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect (Kafka)
-
- /**
- * Read/Write String data to Kafka.
- */
- public static class KafkaString {
-
- /**
- * Read String data from Kafka.
- */
- public static class ReadStringFromKafka {
-
- public static void main(String[] args) {
-
- Pipeline p = initializePipeline(args);
- KafkaOptions options = getOptions(p);
-
- FlinkKafkaConsumer08<String> kafkaConsumer =
- new FlinkKafkaConsumer08<>(options.getKafkaTopic(),
- new SimpleStringSchema(), getKafkaProps(options));
-
- p
- .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of())
- .apply(ParDo.of(new PrintFn<>()));
-
- p.run();
-
- }
-
- }
-
- /**
- * Write String data to Kafka.
- */
- public static class WriteStringToKafka {
-
- public static void main(String[] args) {
-
- Pipeline p = initializePipeline(args);
- KafkaOptions options = getOptions(p);
-
- PCollection<String> words =
- p.apply(Create.of("These", "are", "some", "words"));
-
- FlinkKafkaProducer08<String> kafkaSink =
- new FlinkKafkaProducer08<>(options.getKafkaTopic(),
- new SimpleStringSchema(), getKafkaProps(options));
-
- words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
-
- p.run();
- }
-
- }
- }
-
- /**
- * Read/Write Avro data to Kafka.
- */
- public static class KafkaAvro {
-
- /**
- * Read Avro data from Kafka.
- */
- public static class ReadAvroFromKafka {
-
- public static void main(String[] args) {
-
- Pipeline p = initializePipeline(args);
- KafkaOptions options = getOptions(p);
-
- FlinkKafkaConsumer08<MyType> kafkaConsumer =
- new FlinkKafkaConsumer08<>(options.getKafkaAvroTopic(),
- new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options));
-
- p
- .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
- .setCoder(AvroCoder.of(MyType.class))
- .apply(ParDo.of(new PrintFn<>()));
-
- p.run();
-
- }
-
- }
-
- /**
- * Write Avro data to Kafka.
- */
- public static class WriteAvroToKafka {
-
- public static void main(String[] args) {
-
- Pipeline p = initializePipeline(args);
- KafkaOptions options = getOptions(p);
-
- PCollection<MyType> words =
- p.apply(Create.of(
- new MyType("word", 1L),
- new MyType("another", 2L),
- new MyType("yet another", 3L)));
-
- FlinkKafkaProducer08<MyType> kafkaSink =
- new FlinkKafkaProducer08<>(options.getKafkaAvroTopic(),
- new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options));
-
- words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
-
- p.run();
-
- }
- }
-
- /**
- * Serialiation/Deserialiation schema for Avro types.
- * @param <T> the type being encoded
- */
- static class AvroSerializationDeserializationSchema<T>
- implements SerializationSchema<T>, DeserializationSchema<T> {
-
- private final Class<T> avroType;
-
- private final AvroCoder<T> coder;
- private transient ByteArrayOutputStream out;
-
- AvroSerializationDeserializationSchema(Class<T> clazz) {
- this.avroType = clazz;
- this.coder = AvroCoder.of(clazz);
- this.out = new ByteArrayOutputStream();
- }
-
- @Override
- public byte[] serialize(T element) {
- if (out == null) {
- out = new ByteArrayOutputStream();
- }
- try {
- out.reset();
- coder.encode(element, out, Coder.Context.NESTED);
- } catch (IOException e) {
- throw new RuntimeException("Avro encoding failed.", e);
- }
- return out.toByteArray();
- }
-
- @Override
- public T deserialize(byte[] message) throws IOException {
- return coder.decode(new ByteArrayInputStream(message), Coder.Context.NESTED);
- }
-
- @Override
- public boolean isEndOfStream(T nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<T> getProducedType() {
- return TypeExtractor.getForClass(avroType);
- }
- }
-
- /**
- * Custom type for Avro serialization.
- */
- static class MyType implements Serializable {
-
- public MyType() {}
-
- MyType(String word, long count) {
- this.word = word;
- this.count = count;
- }
-
- String word;
- long count;
-
- @Override
- public String toString() {
- return "MyType{"
- + "word='" + word + '\''
- + ", count=" + count
- + '}';
- }
- }
- }
-
- // -------------- Utilities --------------
-
- /**
- * Custom options for the Pipeline.
- */
- public interface KafkaOptions extends FlinkPipelineOptions {
- @Description("The Kafka topic to read from")
- @Default.String(KAFKA_TOPIC)
- String getKafkaTopic();
-
- void setKafkaTopic(String value);
-
- void setKafkaAvroTopic(String value);
-
- @Description("The Kafka topic to read from")
- @Default.String(KAFKA_AVRO_TOPIC)
- String getKafkaAvroTopic();
-
- @Description("The Kafka Broker to read from")
- @Default.String(KAFKA_BROKER)
- String getBroker();
-
- void setBroker(String value);
-
- @Description("The Zookeeper server to connect to")
- @Default.String(ZOOKEEPER)
- String getZookeeper();
-
- void setZookeeper(String value);
-
- @Description("The groupId")
- @Default.String(GROUP_ID)
- String getGroup();
-
- void setGroup(String value);
- }
-
- /**
- * Initializes some options for the Flink runner.
- * @param args The command line args
- * @return the pipeline
- */
- private static Pipeline initializePipeline(String[] args) {
- KafkaOptions options =
- PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class);
-
- options.setStreaming(true);
- options.setRunner(FlinkRunner.class);
-
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
-
- return Pipeline.create(options);
- }
-
- /**
- * Gets KafkaOptions from the Pipeline.
- * @param p the pipeline
- * @return KafkaOptions
- */
- private static KafkaOptions getOptions(Pipeline p) {
- return p.getOptions().as(KafkaOptions.class);
- }
-
- /**
- * Helper method to set the Kafka props from the pipeline options.
- * @param options KafkaOptions
- * @return Kafka props
- */
- private static Properties getKafkaProps(KafkaOptions options) {
-
- Properties props = new Properties();
- props.setProperty("zookeeper.connect", options.getZookeeper());
- props.setProperty("bootstrap.servers", options.getBroker());
- props.setProperty("group.id", options.getGroup());
-
- return props;
- }
-
- /**
- * Print contents to stdout.
- * @param <T> type of the input
- */
- private static class PrintFn<T> extends DoFn<T, T> {
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- System.out.println(c.element().toString());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
deleted file mode 100644
index ee0e874..0000000
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.examples.streaming;
-
-import java.util.Properties;
-import org.apache.beam.runners.flink.FlinkRunner;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-import org.joda.time.Duration;
-
-/**
- * Wordcount example using Kafka topic.
- */
-public class KafkaWindowedWordCountExample {
-
- static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from
- static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact
- static final String GROUP_ID = "myGroup"; // Default groupId
- static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka
-
- /**
- * Function to extract words.
- */
- public static class ExtractWordsFn extends DoFn<String, String> {
- private final Aggregator<Long, Long> emptyLines =
- createAggregator("emptyLines", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().trim().isEmpty()) {
- emptyLines.addValue(1L);
- }
-
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- /**
- * Function to format KV as String.
- */
- public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- String row = c.element().getKey() + " - " + c.element().getValue() + " @ "
- + c.timestamp().toString();
- System.out.println(row);
- c.output(row);
- }
- }
-
- /**
- * Pipeline options.
- */
- public interface KafkaStreamingWordCountOptions
- extends WindowedWordCount.StreamingWordCountOptions {
- @Description("The Kafka topic to read from")
- @Default.String(KAFKA_TOPIC)
- String getKafkaTopic();
-
- void setKafkaTopic(String value);
-
- @Description("The Kafka Broker to read from")
- @Default.String(KAFKA_BROKER)
- String getBroker();
-
- void setBroker(String value);
-
- @Description("The Zookeeper server to connect to")
- @Default.String(ZOOKEEPER)
- String getZookeeper();
-
- void setZookeeper(String value);
-
- @Description("The groupId")
- @Default.String(GROUP_ID)
- String getGroup();
-
- void setGroup(String value);
-
- }
-
- public static void main(String[] args) {
- PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
- KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args)
- .as(KafkaStreamingWordCountOptions.class);
- options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
- options.setStreaming(true);
- options.setCheckpointingInterval(1000L);
- options.setNumberOfExecutionRetries(5);
- options.setExecutionRetryDelay(3000L);
- options.setRunner(FlinkRunner.class);
-
- System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " "
- + options.getBroker() + " " + options.getGroup());
- Pipeline pipeline = Pipeline.create(options);
-
- Properties p = new Properties();
- p.setProperty("zookeeper.connect", options.getZookeeper());
- p.setProperty("bootstrap.servers", options.getBroker());
- p.setProperty("group.id", options.getGroup());
-
- // this is the Flink consumer that reads the input to
- // the program from a kafka topic.
- FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>(
- options.getKafkaTopic(),
- new SimpleStringSchema(), p);
-
- PCollection<String> words = pipeline
- .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(FixedWindows.of(
- Duration.standardSeconds(options.getWindowSize())))
- .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> wordCounts =
- words.apply(Count.<String>perElement());
-
- wordCounts.apply(ParDo.of(new FormatAsStringFn()))
- .apply(TextIO.Write.to("./outputKafka.txt"));
-
- pipeline.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index fbd7620..123d5e7 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -36,7 +36,6 @@ import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.types.FlinkCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
@@ -45,17 +44,13 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -69,7 +64,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -94,12 +88,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -124,7 +116,6 @@ class FlinkStreamingTransformTranslators {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
- TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
@@ -203,31 +194,6 @@ class FlinkStreamingTransformTranslators {
}
}
- private static class WriteSinkStreamingTranslator<T>
- extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {
-
- @Override
- public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) {
- String name = transform.getName();
- PValue input = context.getInput(transform);
-
- Sink<T> sink = transform.getSink();
- if (!(sink instanceof UnboundedFlinkSink)) {
- throw new UnsupportedOperationException(
- "At the time, only unbounded Flink sinks are supported.");
- }
-
- DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
-
- inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
- @Override
- public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception {
- out.collect(value.getValue());
- }
- }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
- }
- }
-
private static class UnboundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
@@ -241,47 +207,18 @@ class FlinkStreamingTransformTranslators {
context.getTypeInfo(context.getOutput(transform));
DataStream<WindowedValue<T>> source;
- if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
- @SuppressWarnings("unchecked")
- UnboundedFlinkSource<T> flinkSourceFunction =
- (UnboundedFlinkSource<T>) transform.getSource();
-
- final AssignerWithPeriodicWatermarks<T> flinkAssigner =
- flinkSourceFunction.getFlinkTimestampAssigner();
-
- DataStream<T> flinkSource = context.getExecutionEnvironment()
- .addSource(flinkSourceFunction.getFlinkSource());
-
- flinkSourceFunction.setCoder(
- new FlinkCoder<T>(flinkSource.getType(),
- context.getExecutionEnvironment().getConfig()));
-
- source = flinkSource
- .assignTimestampsAndWatermarks(flinkAssigner)
- .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
- @Override
- public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
- collector.collect(
- WindowedValue.of(
- s,
- new Instant(flinkAssigner.extractTimestamp(s, -1)),
- GlobalWindow.INSTANCE,
- PaneInfo.NO_FIRING));
- }}).returns(outputTypeInfo);
- } else {
- try {
- UnboundedSourceWrapper<T, ?> sourceWrapper =
- new UnboundedSourceWrapper<>(
- context.getPipelineOptions(),
- transform.getSource(),
- context.getExecutionEnvironment().getParallelism());
- source = context
- .getExecutionEnvironment()
- .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
- } catch (Exception e) {
- throw new RuntimeException(
- "Error while translating UnboundedSource: " + transform.getSource(), e);
- }
+ try {
+ UnboundedSourceWrapper<T, ?> sourceWrapper =
+ new UnboundedSourceWrapper<>(
+ context.getPipelineOptions(),
+ transform.getSource(),
+ context.getExecutionEnvironment().getParallelism());
+ source = context
+ .getExecutionEnvironment()
+ .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Error while translating UnboundedSource: " + transform.getSource(), e);
}
context.setOutputDataStream(output, source);
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
deleted file mode 100644
index 8b90c73..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.types;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-
-/**
- * A Coder that uses Flink's serialization system.
- * @param <T> The type of the value to be encoded
- */
-public class FlinkCoder<T> extends StandardCoder<T> {
-
- private final TypeSerializer<T> typeSerializer;
-
- public FlinkCoder(TypeInformation<T> typeInformation, ExecutionConfig executionConfig) {
- this.typeSerializer = typeInformation.createSerializer(executionConfig);
- }
-
- @Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
- typeSerializer.serialize(value, new DataOutputViewStreamWrapper(outStream));
- }
-
- @Override
- public T decode(InputStream inStream, Context context) throws IOException {
- return typeSerializer.deserialize(new DataInputViewStreamWrapper(inStream));
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
deleted file mode 100644
index af36b80..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
-/**
- * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into
- * unbounded Beam sinks (see {@link UnboundedSource}).
- * */
-public class UnboundedFlinkSink<T> extends Sink<T> {
-
- /* The Flink sink function */
- private final SinkFunction<T> flinkSink;
-
- private UnboundedFlinkSink(SinkFunction<T> flinkSink) {
- this.flinkSink = flinkSink;
- }
-
- public SinkFunction<T> getFlinkSource() {
- return this.flinkSink;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- }
-
- @Override
- public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) {
- return new WriteOperation<T, Object>() {
- @Override
- public void initialize(PipelineOptions options) throws Exception {
-
- }
-
- @Override
- public void setWindowedWrites(boolean windowedWrites) {
- }
-
- @Override
- public void finalize(Iterable<Object> writerResults, PipelineOptions options)
- throws Exception {
-
- }
-
- @Override
- public Coder<Object> getWriterResultCoder() {
- return new Coder<Object>() {
- @Override
- public void encode(Object value, OutputStream outStream, Context context)
- throws CoderException, IOException {
-
- }
-
- @Override
- public Object decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- return null;
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return null;
- }
-
- @Override
- public CloudObject asCloudObject() {
- return null;
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
-
- }
-
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
- @Override
- public Object structuralValue(Object value) throws Exception {
- return null;
- }
-
- @Override
- public boolean isRegisterByteSizeObserverCheap(Object value, Context context) {
- return false;
- }
-
- @Override
- public void registerByteSizeObserver(Object value,
- ElementByteSizeObserver observer,
- Context context) throws Exception {
-
- }
-
- @Override
- public String getEncodingId() {
- return null;
- }
-
- @Override
- public Collection<String> getAllowedEncodings() {
- return null;
- }
-
- @Override
- public TypeDescriptor<Object> getEncodedTypeDescriptor() {
- return TypeDescriptor.of(Object.class);
- }
- };
- }
-
- @Override
- public Writer<T, Object> createWriter(PipelineOptions options) throws Exception {
- return new Writer<T, Object>() {
- @Override
- public void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
- }
-
- @Override
- public void openUnwindowed(String uId, int shard, int numShards) throws Exception {
- }
-
- @Override
- public void cleanup() throws Exception {
-
- }
-
- @Override
- public void write(T value) throws Exception {
-
- }
-
- @Override
- public Object close() throws Exception {
- return null;
- }
-
- @Override
- public WriteOperation<T, Object> getWriteOperation() {
- return null;
- }
-
- };
- }
-
- @Override
- public Sink<T> getSink() {
- return UnboundedFlinkSink.this;
- }
- };
- }
-
- /**
- * Creates a Flink sink to write to using the Write API.
- * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09
- * @param <T> The input type of the sink
- * @return A Beam sink wrapping a Flink sink
- */
- public static <T> Sink<T> of(SinkFunction<T> flinkSink) {
- return new UnboundedFlinkSink<>(flinkSink);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d8213fa6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
deleted file mode 100644
index ac20c34..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
-/**
- * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into
- * unbounded Beam sources (see {@link UnboundedSource}).
- * */
-public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
-
- private final SourceFunction<T> flinkSource;
-
- /** Coder set during translation. */
- private Coder<T> coder;
-
- /** Timestamp / watermark assigner for source; defaults to ingestion time. */
- private AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner =
- new IngestionTimeExtractor<T>();
-
- public UnboundedFlinkSource(SourceFunction<T> source) {
- flinkSource = checkNotNull(source);
- }
-
- public UnboundedFlinkSource(SourceFunction<T> source,
- AssignerWithPeriodicWatermarks<T> timestampAssigner) {
- flinkSource = checkNotNull(source);
- flinkTimestampAssigner = checkNotNull(timestampAssigner);
- }
-
- public SourceFunction<T> getFlinkSource() {
- return this.flinkSource;
- }
-
- public AssignerWithPeriodicWatermarks<T> getFlinkTimestampAssigner() {
- return flinkTimestampAssigner;
- }
-
- @Override
- public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(
- int desiredNumSplits,
- PipelineOptions options) throws Exception {
- throw new RuntimeException("Flink Sources are supported only when "
- + "running with the FlinkRunner.");
- }
-
- @Override
- public UnboundedReader<T> createReader(PipelineOptions options,
- @Nullable CheckpointMark checkpointMark) {
- throw new RuntimeException("Flink Sources are supported only when "
- + "running with the FlinkRunner.");
- }
-
- @Nullable
- @Override
- public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
- throw new RuntimeException("Flink Sources are supported only when "
- + "running with the FlinkRunner.");
- }
-
-
- @Override
- public void validate() {
- }
-
- @Override
- public Coder<T> getDefaultOutputCoder() {
- // The coder derived from the Flink source
- return coder;
- }
-
- public void setCoder(Coder<T> coder) {
- this.coder = coder;
- }
-
- public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
- this.flinkTimestampAssigner = flinkTimestampAssigner;
- }
-
- /**
- * Creates a new unbounded source from a Flink source.
- * @param flinkSource The Flink source function
- * @param <T> The type that the source function produces.
- * @return The wrapped source function.
- */
- public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
- SourceFunction<T> flinkSource) {
- return new UnboundedFlinkSource<>(flinkSource);
- }
-
- public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(
- SourceFunction<T> flinkSource, AssignerWithPeriodicWatermarks<T> flinkTimestampAssigner) {
- return new UnboundedFlinkSource<>(flinkSource, flinkTimestampAssigner);
- }
-}