You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/17 12:31:20 UTC
incubator-beam git commit: [flink] fix UnboundedFlinkSource wrapper
Repository: incubator-beam
Updated Branches:
refs/heads/master a9c46057e -> 0f137169e
[flink] fix UnboundedFlinkSource wrapper
- remove unnecessary PipelineOptions cache
- use the correct interface types
- improve Kafka example
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f137169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f137169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f137169
Branch: refs/heads/master
Commit: 0f137169e4c2cd8d3e5a86c91bc2f401d276e8ed
Parents: a9c4605
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 17 12:26:03 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Mar 17 12:27:37 2016 +0100
----------------------------------------------------------------------
.../KafkaWindowedWordCountExample.java | 7 ++--
.../streaming/io/UnboundedFlinkSource.java | 37 ++++++++++----------
2 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 55cdc22..3942d0d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
@@ -30,7 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.joda.time.Duration;
@@ -121,12 +120,12 @@ public class KafkaWindowedWordCountExample {
// this is the Flink consumer that reads the input to
// the program from a kafka topic.
- FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>(
+ FlinkKafkaConsumer08<String> kafkaConsumer = new FlinkKafkaConsumer08<>(
options.getKafkaTopic(),
new SimpleStringSchema(), p);
PCollection<String> words = pipeline
- .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount"))
+ .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f137169/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 2857efd..05a8956 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -22,7 +22,9 @@ import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.common.base.Preconditions;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import javax.annotation.Nullable;
import java.util.List;
@@ -31,52 +33,51 @@ import java.util.List;
* A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
* unbounded Beam sources (see {@link UnboundedSource}).
* */
-public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> {
+public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
- private final PipelineOptions options;
- private final RichParallelSourceFunction<T> flinkSource;
+ private final SourceFunction<T> flinkSource;
- public UnboundedFlinkSource(PipelineOptions pipelineOptions, RichParallelSourceFunction<T> source) {
- if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
- }
- options = Preconditions.checkNotNull(pipelineOptions);
+ public UnboundedFlinkSource(SourceFunction<T> source) {
flinkSource = Preconditions.checkNotNull(source);
- validate();
}
- public RichParallelSourceFunction<T> getFlinkSource() {
+ public SourceFunction<T> getFlinkSource() {
return this.flinkSource;
}
@Override
- public List<? extends UnboundedSource<T, C>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
+ public List<? extends UnboundedSource<T, UnboundedSource.CheckpointMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
}
@Override
- public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C checkpointMark) {
+ public UnboundedReader<T> createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) {
throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
}
@Nullable
@Override
- public Coder<C> getCheckpointMarkCoder() {
+ public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
}
@Override
public void validate() {
- Preconditions.checkNotNull(options);
- Preconditions.checkNotNull(flinkSource);
- if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
- }
}
@Override
public Coder<T> getDefaultOutputCoder() {
throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
}
+
+ /**
+ * Creates a new unbounded source from a Flink source.
+ * @param flinkSource The Flink source function
+ * @param <T> The type that the source function produces.
+ * @return The wrapped source function.
+ */
+ public static <T> UnboundedSource<T, UnboundedSource.CheckpointMark> of(SourceFunction<T> flinkSource) {
+ return new UnboundedFlinkSource<>(flinkSource);
+ }
}