You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/04/29 17:58:59 UTC

[3/4] incubator-beam git commit: Flink sink implementation

Flink sink implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc847a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc847a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc847a95

Branch: refs/heads/master
Commit: bc847a9582447372461c5cf35450ba4a4c3d490d
Parents: 4fd9d74
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 22 12:33:26 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Apr 29 17:58:00 2016 +0200

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     |  33 +++-
 .../streaming/io/UnboundedFlinkSink.java        | 175 +++++++++++++++++++
 2 files changed, 204 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 927c3a2..db24f9d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -26,13 +26,16 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupBy
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -64,12 +67,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
-import org.apache.flink.streaming.api.functions.TimestampAssigner;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
-import org.apache.kafka.common.utils.Time;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +103,9 @@ public class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
     TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
@@ -193,6 +195,29 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static class WriteSinkStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+
+    @Override
+    public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+      String name = transform.getName();
+      PValue input = context.getInput(transform);
+
+      Sink<T> sink = transform.getSink();
+      if (!(sink instanceof UnboundedFlinkSink)) {
+        throw new UnsupportedOperationException("At the time, only unbounded Flink sinks are supported.");
+      }
+
+      DataStream<WindowedValue<T>> inputDataSet = context.getInputDataStream(input);
+
+      inputDataSet.flatMap(new FlatMapFunction<WindowedValue<T>, Object>() {
+        @Override
+        public void flatMap(WindowedValue<T> value, Collector<Object> out) throws Exception {
+          out.collect(value.getValue());
+        }
+      }).addSink(((UnboundedFlinkSink<Object>) sink).getFlinkSource()).name(name);
+    }
+  }
+
   private static class BoundedReadSourceTranslator<T>
       implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc847a95/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
new file mode 100644
index 0000000..77c195a
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into
+ * unbounded Beam sinks (see {@link UnboundedSource}).
+ * */
+public class UnboundedFlinkSink<T> extends Sink<T> {
+
+  /* The Flink sink function */
+  private final SinkFunction<T> flinkSink;
+
+  private UnboundedFlinkSink(SinkFunction<T> flinkSink) {
+    this.flinkSink = flinkSink;
+  }
+
+  public SinkFunction<T> getFlinkSource() {
+    return this.flinkSink;
+  }
+
+  @Override
+  public void validate(PipelineOptions options) {
+  }
+
+  @Override
+  public WriteOperation<T, ?> createWriteOperation(PipelineOptions options) {
+    return new WriteOperation<T, Object>() {
+      @Override
+      public void initialize(PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public void finalize(Iterable<Object> writerResults, PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public Coder<Object> getWriterResultCoder() {
+        return new Coder<Object>() {
+          @Override
+          public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException {
+
+          }
+
+          @Override
+          public Object decode(InputStream inStream, Context context) throws CoderException, IOException {
+            return null;
+          }
+
+          @Override
+          public List<? extends Coder<?>> getCoderArguments() {
+            return null;
+          }
+
+          @Override
+          public CloudObject asCloudObject() {
+            return null;
+          }
+
+          @Override
+          public void verifyDeterministic() throws NonDeterministicException {
+
+          }
+
+          @Override
+          public boolean consistentWithEquals() {
+            return false;
+          }
+
+          @Override
+          public Object structuralValue(Object value) throws Exception {
+            return null;
+          }
+
+          @Override
+          public boolean isRegisterByteSizeObserverCheap(Object value, Context context) {
+            return false;
+          }
+
+          @Override
+          public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception {
+
+          }
+
+          @Override
+          public String getEncodingId() {
+            return null;
+          }
+
+          @Override
+          public Collection<String> getAllowedEncodings() {
+            return null;
+          }
+        };
+      }
+
+      @Override
+      public Writer<T, Object> createWriter(PipelineOptions options) throws Exception {
+        return new Writer<T, Object>() {
+          @Override
+          public void open(String uId) throws Exception {
+
+          }
+
+          @Override
+          public void write(T value) throws Exception {
+
+          }
+
+          @Override
+          public Object close() throws Exception {
+            return null;
+          }
+
+          @Override
+          public WriteOperation<T, Object> getWriteOperation() {
+            return null;
+          }
+
+        };
+      }
+
+      @Override
+      public Sink<T> getSink() {
+        return UnboundedFlinkSink.this;
+      }
+    };
+  }
+
+  /**
+   * Creates a Flink sink to write to using the Write API.
+   * @param flinkSink The Flink sink, e.g. FlinkKafkaProducer09
+   * @param <T> The input type of the sink
+   * @return A Beam sink wrapping a Flink sink
+   */
+  public static <T> Sink<T> of(SinkFunction<T> flinkSink) {
+    return new UnboundedFlinkSink<>(flinkSink);
+  }
+}