You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/17 14:55:23 UTC

incubator-beam git commit: [flink] improvements to the Kafka Example

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0f137169e -> ef1e32dee


[flink] improvements to the Kafka Example

- use timestamp extractor after ingestion
- fix coder runtime exception
- correct logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1e32de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1e32de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1e32de

Branch: refs/heads/master
Commit: ef1e32deefb9886584556c7125e87b2873c63ebf
Parents: 0f13716
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 17 14:49:09 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Mar 17 14:53:41 2016 +0100

----------------------------------------------------------------------
 .../examples/streaming/KafkaWindowedWordCountExample.java     | 2 +-
 .../flink/translation/FlinkStreamingTransformTranslators.java | 3 ++-
 .../wrappers/streaming/io/UnboundedFlinkSource.java           | 7 +++----
 runners/flink/runner/src/main/resources/log4j.properties      | 2 +-
 4 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index 3942d0d..8fca1d4 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -103,7 +103,7 @@ public class KafkaWindowedWordCountExample {
   public static void main(String[] args) {
     PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class);
     KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
-    options.setJobName("KafkaExample");
+    options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
     options.setStreaming(true);
     options.setCheckpointingInterval(1000L);
     options.setNumberOfExecutionRetries(5);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index bdefeaf..2b9b0ee 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
 import org.apache.flink.util.Collector;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -179,7 +180,7 @@ public class FlinkStreamingTransformTranslators {
               public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
                 collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
               }
-            });
+            }).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
       } else {
         source = context.getExecutionEnvironment()
             .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 05a8956..82984cb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -17,12 +17,10 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
-import org.apache.beam.runners.flink.FlinkPipelineRunner;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.common.base.Preconditions;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 
@@ -30,7 +28,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 /**
- * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into
+ * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into
  * unbounded Beam sources (see {@link UnboundedSource}).
  * */
 public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
@@ -68,7 +66,8 @@ public class UnboundedFlinkSource<T> extends UnboundedSource<T, UnboundedSource.
 
   @Override
   public Coder<T> getDefaultOutputCoder() {
-    throw new RuntimeException("Flink Sources are supported only when running with the FlinkPipelineRunner.");
+    // The coder is specified in the Flink source
+    return null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1e32de/runners/flink/runner/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/runner/src/main/resources/log4j.properties
index 4daaad1..4b6a708 100644
--- a/runners/flink/runner/src/main/resources/log4j.properties
+++ b/runners/flink/runner/src/main/resources/log4j.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO,console
+log4j.rootLogger=OFF,console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout