You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/04/29 17:58:59 UTC
[3/4] incubator-beam git commit: Flink sink implementation
Flink sink implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc847a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc847a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc847a95
Branch: refs/heads/master
Commit: bc847a9582447372461c5cf35450ba4a4c3d490d
Parents: 4fd9d74
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 22 12:33:26 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Apr 29 17:58:00 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 33 +++-
.../streaming/io/UnboundedFlinkSink.java | 175 +++++++++++++++++++
2 files changed, 204 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 927c3a2..db24f9d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -26,13 +26,16 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupBy
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Sink;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -64,12 +67,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
-import org.apache.kafka.common.utils.Time;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,6 +103,9 @@ public class FlinkStreamingTransformTranslators {
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+ TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+
TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
@@ -193,6 +195,29 @@ public class FlinkStreamingTransformTranslators {
}
}
+ private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+
+ @Override
+ public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+ String name = transform.getName();
+ PValue input = context.getInput(transform);
+
+ Sink<T> sink = transform.getSink();
+ if (!(sink instanceof UnboundedFlinkSink)) {
+ throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported.");
+ }
+
+ DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
+
+ inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
+ @Override
+ public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception {
+ out.collect(value.getValue());
+ }
+ }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
+ }
+ }
+
private static class BoundedReadSourceTranslator<T>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
new file mode 100644
index 0000000..77c195a
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into
+ * unbounded Beam sinks (see {@link UnboundedSource}).
+ * */
+public class UnboundedFlinkSink<T> extends Sink<T> {
+
+ /* The Flink sink function */
+ private final SinkFunction<T> flinkSink;
+
+ private UnboundedFlinkSink(SinkFunction<T> flinkSink) {
+ this.flinkSink = flinkSink;
+ }
+
+ public SinkFunction<T> getFlinkSource() {
+ return this.flinkSink;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ }
+
+ @Override
+ public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) {
+ return new WriteOperation<T, Object>() {
+ @Override
+ public void initialize(PipelineOptions options) throws Exception {
+
+ }
+
+ @Override
+ public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception {
+
+ }
+
+ @Override
+ public Coder<Object> getWriterResultCoder() {
+ return new Coder<Object>() {
+ @Override
+ public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException {
+
+ }
+
+ @Override
+ public Object decode(InputStream inStream, Context context) throws CoderException, IOException {
+ return null;
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+ @Override
+ public Object structuralValue(Object value) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Object value, Context context) {
+ return false;
+ }
+
+ @Override
+ public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception {
+
+ }
+
+ @Override
+ public String getEncodingId() {
+ return null;
+ }
+
+ @Override
+ public Collection<String> getAllowedEncodings() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public Writer<T, Object> createWriter(PipelineOptions options) throws Exception {
+ return new Writer<T, Object>() {
+ @Override
+ public void open(String uId) throws Exception {
+
+ }
+
+ @Override
+ public void write(T value) throws Exception {
+
+ }
+
+ @Override
+ public Object close() throws Exception {
+ return null;
+ }
+
+ @Override
+ public WriteOperation<T, Object> getWriteOperation() {
+ return null;
+ }
+
+ };
+ }
+
+ @Override
+ public Sink<T> getSink() {
+ return UnboundedFlinkSink.this;
+ }
+ };
+ }
+
+ /**
+ * Creates a Flink sink to write to using the Write API.
+ * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09
+ * @param <T> The input type of the sink
+ * @return A Beam sink wrapping a Flink sink
+ */
+ public static <T> Sink<T> of(SinkFunction<T> flinkSink) {
+ return new UnboundedFlinkSink<>(flinkSink);
+ }
+}