You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/17 14:55:23 UTC
incubator-beam git commit: [flink] improvements to the Kafka Example
Repository: incubator-beam
Updated Branches:
refs/heads/master 0f137169e -> ef1e32dee
[flink] improvements to the Kafka Example
- use timestamp extractor after ingestion
- fix coder runtime exception
- correct logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1e32de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1e32de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1e32de
Branch: refs/heads/master
Commit: ef1e32deefb9886584556c7125e87b2873c63ebf
Parents: 0f13716
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 17 14:49:09 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Mar 17 14:53:41 2016 +0100
----------------------------------------------------------------------
.../examples/streaming/KafkaWindowedWordCountExample.java | 2 +-
.../flink/translation/FlinkStreamingTransformTranslators.java | 3 ++-
.../wrappers/streaming/io/UnboundedFlinkSource.java | 7 +++----
runners/flink/runner/src/main/resources/log4j.properties | 2 +-
4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 3942d0d..8fca1d4 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -103,7 +103,7 @@ public class KafkaWindowedWordCountExample {
public static void main(String[] args) {
PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
- options.setJobName("KafkaExample");
+ options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
options.setStreaming(true);
options.setCheckpointingInterval(1000L);
options.setNumberOfExecutionRetries(5);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index bdefeaf..2b9b0ee 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -179,7 +180,7 @@ public class FlinkStreamingTransformTranslators {
public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
- });
+ }).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
} else {
source = context.getExecutionEnvironment()
.addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 05a8956..82984cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -17,12 +17,10 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.common.base.Preconditions;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -30,7 +28,7 @@ import javax.annotation.Nullable;
import java.util.List;
/**
- * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
+ * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into
* unbounded Beam sources (see {@link UnboundedSource}).
* */
public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
@@ -68,7 +66,8 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
@Override
public Coder<T> getDefaultOutputCoder() {
- throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+ // The coder is specified in the Flink source
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties
index 4daaad1..4b6a708 100644
--- a/runners/flink/runner/src/main/resources/log4j.properties
+++ b/runners/flink/runner/src/main/resources/log4j.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO,console
+log4j.rootLogger=OFF,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout