You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2017/01/13 18:36:19 UTC

[3/4] beam git commit: [BEAM-1229] flink KafkaIOExamples submit error

[BEAM-1229] flink KafkaIOExamples submit error


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/078573e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/078573e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/078573e3

Branch: refs/heads/master
Commit: 078573e30b4c9cb29b3c548f8859bb5b23a7a9d1
Parents: 51820cb
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Jan 11 17:08:35 2017 +0400
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:27:04 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/examples/streaming/KafkaIOExamples.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/078573e3/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 3c8a89b..616e276 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.Default;
@@ -78,7 +79,7 @@ public class KafkaIOExamples {
                 new SimpleStringSchema(), getKafkaProps(options));
 
         p
-            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of())
             .apply(ParDo.of(new PrintFn<>()));
 
         p.run();
@@ -133,6 +134,7 @@ public class KafkaIOExamples {
 
         p
             .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+                .setCoder(AvroCoder.of(MyType.class))
             .apply(ParDo.of(new PrintFn<>()));
 
         p.run();