You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/15 19:48:00 UTC

[03/23] incubator-beam git commit: [BEAM-11] set coder for pipeline input

[BEAM-11] set coder for pipeline input


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5c7df60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5c7df60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5c7df60

Branch: refs/heads/master
Commit: c5c7df603289b1d207308da16546dd56e9b9b6d9
Parents: 41c4ca6
Author: Sela <an...@paypal.com>
Authored: Sat Mar 12 17:26:34 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Tue Mar 15 20:38:26 2016 +0200

----------------------------------------------------------------------
 .../spark/io/hadoop/HadoopFileFormatPipelineTest.java    |  5 ++++-
 .../beam/runners/spark/streaming/KafkaStreamingTest.java | 11 +++++++----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5c7df60/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index 7a9be8b..abe1119 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -16,11 +16,13 @@
 package org.apache.beam.runners.spark.io.hadoop;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
+import org.apache.beam.runners.spark.coders.WritableCoder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -66,7 +68,8 @@ public class HadoopFileFormatPipelineTest {
         (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
     HadoopIO.Read.Bound<IntWritable,Text> read =
         HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
-    PCollection<KV<IntWritable, Text>> input = p.apply(read);
+    PCollection<KV<IntWritable, Text>> input = p.apply(read)
+        .setCoder(KvCoder.of(WritableCoder.of(IntWritable.class), WritableCoder.of(Text.class)));
     @SuppressWarnings("unchecked")
     Class<? extends FileOutputFormat<IntWritable, Text>> outputFormatClass =
         (Class<? extends FileOutputFormat<IntWritable, Text>>) (Class<?>) TemplatedSequenceFileOutputFormat.class;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5c7df60/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
index e9e685b..ff1e11c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/streaming/KafkaStreamingTest.java
@@ -15,6 +15,8 @@
 package org.apache.beam.runners.spark.streaming;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
@@ -79,7 +81,7 @@ public class KafkaStreamingTest {
     producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
     Serializer<String> stringSerializer = new StringSerializer();
     try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
-            new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
+        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
       for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
         kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
       }
@@ -96,13 +98,14 @@ public class KafkaStreamingTest {
     Pipeline p = Pipeline.create(options);
 
     Map<String, String> kafkaParams = ImmutableMap.of(
-            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
-            "auto.offset.reset", "smallest"
+        "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
+        "auto.offset.reset", "smallest"
     );
 
     PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class,
         StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC),
-        kafkaParams));
+        kafkaParams))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
     PCollection<KV<String, String>> windowedWords = kafkaInput
         .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));