You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/17 12:31:20 UTC

incubator-beam git commit: [flink] fix UnboundedFlinkSource wrapper

Repository: incubator-beam
Updated Branches:
  refs/heads/master a9c46057e -> 0f137169e


[flink] fix UnboundedFlinkSource wrapper

- remove unnecessary PipelineOptions cache
- use the correct interface types
- improve Kafka example


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f137169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f137169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f137169

Branch: refs/heads/master
Commit: 0f137169e4c2cd8d3e5a86c91bc2f401d276e8ed
Parents: a9c4605
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 17 12:26:03 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Mar 17 12:27:37 2016 +0100

----------------------------------------------------------------------
 .../KafkaWindowedWordCountExample.java          |  7 ++--
 .../streaming/io/UnboundedFlinkSource.java      | 37 ++++++++++----------
 2 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 55cdc22..3942d0d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.Default;
 import com.google.cloud.dataflow.sdk.options.Description;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -30,7 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.*;
 import com.google.cloud.dataflow.sdk.transforms.windowing.*;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.joda.time.Duration;
 
@@ -121,12 +120,12 @@ public class KafkaWindowedWordCountExample {
 
     // this is the Flink consumer that reads the input to
     // the program from a kafka topic.
-    FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+    FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>(
         options.getKafkaTopic(),
         new SimpleStringSchema(), p);
 
     PCollection<String> words = pipeline
-        .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+        .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 2857efd..05a8956 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -22,7 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -31,52 +33,51 @@ import java.util.List;
  * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
  * unbounded Beam sources (see {@link UnboundedSource}).
  * */
-public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
+public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
 
-  private final PipelineOptions options;
-  private final RichParallelSourceFunction<T> flinkSource;
+  private final SourceFunction<T> flinkSource;
 
-  public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
-    if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
-      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-    }
-    options = Preconditions.checkNotNull(pipelineOptions);
+  public UnboundedFlinkSource(SourceFunction<T> source) {
     flinkSource = Preconditions.checkNotNull(source);
-    validate();
   }
 
-  public RichParallelSourceFunction<T> getFlinkSource() {
+  public SourceFunction<T> getFlinkSource() {
     return this.flinkSource;
   }
 
   @Override
-  public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+  public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
     throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
   }
 
   @Override
-  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
     throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
   }
 
   @Nullable
   @Override
-  public Coder<C> getCheckpointMarkCoder() {
+  public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
     throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
   }
 
 
   @Override
   public void validate() {
-    Preconditions.checkNotNull(options);
-    Preconditions.checkNotNull(flinkSource);
-    if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
-      throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
-    }
   }
 
   @Override
   public Coder<T> getDefaultOutputCoder() {
     throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
   }
+
+  /**
+   * Creates a new unbounded source from a Flink source.
+   * @param flinkSource The Flink source function
+   * @param <T> The type that the source function produces.
+   * @return The wrapped source function.
+   */
+  public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
+    return new UnboundedFlinkSource<>(flinkSource);
+  }
 }