You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/10/31 22:08:11 UTC

[1/2] beam git commit: Added VoidSerializer for KafkaIO. Modified KafkaIO.Write.values() to auto add the VoidSerializer for the key.serializer config for kafka producer

Repository: beam
Updated Branches:
  refs/heads/master 710941e90 -> 54bc58bd0


Added VoidSerializer for KafkaIO. Modified KafkaIO.Write.values() to auto add the VoidSerializer for the key.serializer config for kafka producer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/39fdace7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/39fdace7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/39fdace7

Branch: refs/heads/master
Commit: 39fdace70457850631d8a57cf9a6906220435fd3
Parents: 710941e
Author: nerdynick <ne...@gmail.com>
Authored: Tue Oct 10 12:04:12 2017 -0600
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Oct 31 14:41:04 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 10 ++++++++--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java   | 11 ++++-------
 2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/39fdace7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 17e0e34..f6158ca 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -128,6 +128,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
@@ -252,7 +253,7 @@ import org.slf4j.LoggerFactory;
  *  strings.apply(KafkaIO.<Void, String>write()
  *      .withBootstrapServers("broker_1:9092,broker_2:9092")
  *      .withTopic("results")
- *      .withValueSerializer(new StringSerializer()) // just need serializer for value
+ *      .withValueSerializer(StringSerializer.class) // just need serializer for value
  *      .values()
  *    );
  * }</pre>
@@ -1598,8 +1599,13 @@ public class KafkaIO {
      * Writes just the values to Kafka. This is useful for writing collections of values rather
      * thank {@link KV}s.
      */
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(toBuilder().build());
+      return new KafkaValueWrite<>(
+          toBuilder()
+          .setKeySerializer((Class) StringSerializer.class)
+          .build()
+      );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/39fdace7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 89748dd..2cbd448 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -1196,13 +1196,10 @@ public class KafkaIOTest {
     public Producer<Integer, Long> apply(Map<String, Object> config) {
 
       // Make sure the config is correctly set up for serializers.
-      // There may not be a key serializer if we're interested only in values.
-      if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
-        Utils.newInstance(
-                ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
-                        .asSubclass(Serializer.class)
-        ).configure(config, true);
-      }
+      Utils.newInstance(
+              ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+                      .asSubclass(Serializer.class)
+      ).configure(config, true);
 
       Utils.newInstance(
           ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))


[2/2] beam git commit: This closes #3969

Posted by ch...@apache.org.
This closes #3969


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54bc58bd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54bc58bd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54bc58bd

Branch: refs/heads/master
Commit: 54bc58bd095eec01d070378cd93e5e94ad5cec0f
Parents: 710941e 39fdace
Author: chamikara@google.com <ch...@google.com>
Authored: Tue Oct 31 15:07:29 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Oct 31 15:07:29 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 10 ++++++++--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java   | 11 ++++-------
 2 files changed, 12 insertions(+), 9 deletions(-)
----------------------------------------------------------------------