You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/30 21:02:02 UTC

[1/2] beam git commit: Fix NPE in Kafka value writer.

Repository: beam
Updated Branches:
  refs/heads/master ffd87553f -> 66f249933


Fix NPE in Kafka value writer.

KafkaIO.writer()...values() does not require user to set key coder since the key always null.
Validation passes, but it results in an NPE at runtime when the writer is
tries to instantiates the producer. Set key coder to 'NullOnlyCoder'.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0462f59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0462f59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0462f59

Branch: refs/heads/master
Commit: d0462f59548ebed0dd7ae744b138ff956b742cad
Parents: ffd8755
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Mar 29 23:21:54 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 30 13:57:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 22 +++++++++-----------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 16 ++++++++++++++
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 7880cbc..bb7d971 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -254,7 +254,6 @@ public class KafkaIO {
   public static <K, V> Write<K, V> write() {
     return new AutoValue_KafkaIO_Write.Builder<K, V>()
         .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
-        .setValueOnly(false)
         .build();
   }
 
@@ -1159,7 +1158,6 @@ public class KafkaIO {
     @Nullable abstract String getTopic();
     @Nullable abstract Coder<K> getKeyCoder();
     @Nullable abstract Coder<V> getValueCoder();
-    abstract boolean getValueOnly();
     abstract Map<String, Object> getProducerConfig();
     @Nullable
     abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
@@ -1171,7 +1169,6 @@ public class KafkaIO {
       abstract Builder<K, V> setTopic(String topic);
       abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
       abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
-      abstract Builder<K, V> setValueOnly(boolean valueOnly);
       abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
       abstract Builder<K, V> setProducerFactoryFn(
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
@@ -1231,7 +1228,7 @@ public class KafkaIO {
      * collections of values rather thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
+      return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
     }
 
     @Override
@@ -1245,9 +1242,7 @@ public class KafkaIO {
       checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");
-      if (!getValueOnly()) {
-        checkNotNull(getKeyCoder(), "Key coder should be set");
-      }
+      checkNotNull(getKeyCoder(), "Key coder should be set");
       checkNotNull(getValueCoder(), "Value coder should be set");
     }
 
@@ -1255,11 +1250,12 @@ public class KafkaIO {
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(
             ProducerConfig.RETRIES_CONFIG, 3,
+            // See comment about custom serializers in KafkaWriter constructor.
             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);
 
     /**
-     * A set of properties that are not required or don't make sense for our consumer.
+     * A set of properties that are not required or don't make sense for our producer.
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
@@ -1373,11 +1369,13 @@ public class KafkaIO {
     KafkaWriter(Write<K, V> spec) {
       this.spec = spec;
 
-      // Set custom kafka serializers. We can not serialize user objects then pass the bytes to
-      // producer. The key and value objects are used in kafka Partitioner interface.
+      // Set custom kafka serializers. We do not want to serialize user objects then pass the bytes
+      // to producer since key and value objects are used in Kafka Partitioner interface.
       // This does not matter for default partitioner in Kafka as it uses just the serialized
-      // key bytes to pick a partition. But are making sure user's custom partitioner would work
-      // as expected.
+      // key bytes to pick a partition. But we don't want to limit use of custom partitions.
+      // We pass key and values objects the user writes directly Kafka and user supplied
+      // coders to serialize them are invoked inside CoderBasedKafkaSerializer.
+      // Use case : write all the events for a single session to same Kafka partition.
 
       this.producerConfig = new HashMap<>(spec.getProducerConfig());
       this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());

http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1897127..d1696d0 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -71,9 +71,12 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -728,8 +731,21 @@ public class KafkaIOTest {
   private static class ProducerFactoryFn
     implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {
 
+    @SuppressWarnings("unchecked")
     @Override
     public Producer<Integer, Long> apply(Map<String, Object> config) {
+
+      // Make sure the config is correctly set up for serializers.
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, true);
+
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, false);
+
       return MOCK_PRODUCER;
     }
   }


[2/2] beam git commit: This closes #2369

Posted by jk...@apache.org.
This closes #2369


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/66f24993
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/66f24993
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/66f24993

Branch: refs/heads/master
Commit: 66f249933054d063f2d546c27674b2289a6d2002
Parents: ffd8755 d0462f5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 30 13:58:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Mar 30 13:58:07 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 22 +++++++++-----------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 16 ++++++++++++++
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------