You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/09 20:25:43 UTC

[1/2] beam git commit: This closes #2986

Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 9f7521f8d -> df2cdd899


This closes #2986


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ee22fa1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ee22fa1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ee22fa1

Branch: refs/heads/release-2.0.0
Commit: 2ee22fa16f1db61b9c204e93d0e093f791982ed5
Parents: 9f7521f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 9 12:42:44 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 9 13:22:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 321 +++++++------------
 .../CoderBasedKafkaDeserializer.java            |  70 ----
 .../CoderBasedKafkaSerializer.java              |  72 -----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 112 +------
 4 files changed, 119 insertions(+), 456 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2ee22fa1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e21945f..a1130fc 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -67,8 +67,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
-import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
-import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Gauge;
 import org.apache.beam.sdk.metrics.SinkMetrics;
@@ -158,49 +156,14 @@ import org.slf4j.LoggerFactory;
  * }</pre>
  *
  * <p>Kafka provides deserializers for common types in
- * {@link org.apache.kafka.common.serialization}.
- *
- * <p>To read Avro data, {@code fromAvro} can be used. This does not require manually specifying
- * a {@link Coder} or {@link Deserializer}.
- *
- * <p>It's also possible to deserialize data using a Beam {@link Coder} via
- * {@link #readWithCoders(Coder, Coder)}, though this is discouraged because the particular
- * binary format is not guaranteed by coders. However, this can be useful
- * when exchanging data with a Beam pipeline that uses the same coder:
- *
- * <pre>{@code
- *
- *  pipeline
- *    .apply(KafkaIO.<MyKey, MyValue>readWithCoders(MyKeyCoder.of(), MyValueCoder.of())
- *       .withBootstrapServers("broker_1:9092,broker_2:9092")
- *       .withTopic("my_topic")
- *    )
- *    ...
- * }</pre>
- *
- * <p>In most cases, you don't need to specify {@link Coder} for key and value in the resulting
+ * {@link org.apache.kafka.common.serialization}. In addition to deserializers, Beam runners need
+ * {@link Coder} to materialize key and value objects if necessary.
+ * In most cases, you don't need to specify {@link Coder} for key and value in the resulting
  * collection because the coders are inferred from deserializer types. However, in cases when
- * coder inference fails, they can be specified manually using {@link Read#withKeyCoder} and
- * {@link Read#withValueCoder}. Note that the payloads of Kafka messages is interpreted using
- * key and value <i>deserializers</i>; coders are a Beam implementation detail to help runners
- * materialize the data for intermediate storage if necessary.
- *
- * <pre>{@code
- *
- *  pipeline
- *    .apply(KafkaIO.<Long, Foo>read()
- *       .withBootstrapServers("broker_1:9092,broker_2:9092")
- *       .withTopic("my_topic")
- *
- *       // infer coder from deserializer
- *       .withKeyDeserializer(LongDeserializer.class)
- *
- *       // explicitly specify coder
- *       .withValueDeserializer(FooDeserializer.class)
- *       .withValueCoder(FooCoder.of())
- *     )
- *     ...
- * }</pre>
+ * coder inference fails, they can be specified explicitly along with deserializers using
+ * {@link Read#withKeyDeserializerAndCoder(Class, Coder)} and
+ * {@link Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are
+ * interpreted using key and value <i>deserializers</i>.
  *
  * <h3>Partition Assignment and Checkpointing</h3>
  * The Kafka partitions are evenly distributed among splits (workers).
@@ -257,11 +220,6 @@ import org.slf4j.LoggerFactory;
  *    );
  * }</pre>
  *
- * <p>Same notes on coders vs. serializers apply as above for {@link Read}.
- *
- * <p>To write Avro data, {@code toAvro} can be used. This does not require specifying serializers
- * or coders.
- *
  * <h3>Advanced Kafka Configuration</h3>
  * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in
  * {@link ProducerConfig} for sink. E.g. if you would like to enable offset
@@ -280,46 +238,6 @@ import org.slf4j.LoggerFactory;
 public class KafkaIO {
 
   /**
-   * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
-   * deserializer argument using the {@link Coder} registry.
-   */
-  @VisibleForTesting
-  static <T> NullableCoder<T> inferCoder(
-      CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
-    checkNotNull(deserializer);
-
-    for (Type type : deserializer.getGenericInterfaces()) {
-      if (!(type instanceof ParameterizedType)) {
-        continue;
-      }
-
-      // This does not recurse: we will not infer from a class that extends
-      // a class that extends Deserializer<T>.
-      ParameterizedType parameterizedType = (ParameterizedType) type;
-
-      if (parameterizedType.getRawType() == Deserializer.class) {
-        Type parameter = parameterizedType.getActualTypeArguments()[0];
-
-        @SuppressWarnings("unchecked")
-        Class<T> clazz = (Class<T>) parameter;
-
-        try {
-          return NullableCoder.of(coderRegistry.getCoder(clazz));
-        } catch (CannotProvideCoderException e) {
-          throw new RuntimeException(
-                  String.format("Unable to automatically infer a Coder for "
-                                + "the Kafka Deserializer %s: no coder registered for type %s",
-                                deserializer, clazz));
-        }
-      }
-    }
-
-    throw new RuntimeException(
-            String.format("Could not extract the Kafaka Deserializer type from %s",
-                          deserializer));
-  }
-
-  /**
    * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
    * configuration should set with {@link Read#withBootstrapServers(String)} and
    * {@link Read#withTopics(List)}. Other optional settings include key and value
@@ -354,44 +272,6 @@ public class KafkaIO {
   }
 
   /**
-   * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
-   * based on {@link Coder} instances.
-   */
-  @SuppressWarnings("unchecked")
-  public static <K, V> Read<K, V> readWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
-    // Kafka constructs deserializers directly. Pass coder through consumer
-    // configuration.
-    ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
-    Map<String, Object> config = builder
-            .putAll(Read.DEFAULT_CONSUMER_PROPERTIES)
-            .put(CoderBasedKafkaDeserializer.configForKeyDeserializer(), keyCoder)
-            .put(CoderBasedKafkaDeserializer.configForValueDeserializer(), valueCoder)
-            .build();
-
-    return new AutoValue_KafkaIO_Read.Builder<K, V>()
-            .setTopics(new ArrayList<String>())
-            .setTopicPartitions(new ArrayList<TopicPartition>())
-            .setKeyCoder(keyCoder)
-            .setValueCoder(valueCoder)
-            .setKeyDeserializer((Class) CoderBasedKafkaDeserializer.class)
-            .setValueDeserializer((Class) CoderBasedKafkaDeserializer.class)
-            .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
-            .setConsumerConfig(config)
-            .setMaxNumRecords(Long.MAX_VALUE)
-            .build();
-  }
-
-  /**
-   * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
-   * based on {@link AvroCoder}. This reads data in the Avro binary format directly without using
-   * an Avro object container.
-   */
-  @SuppressWarnings("unchecked")
-  public static <K, V> Read<K, V> fromAvro(Class<K> keyClass, Class<V> valueClass) {
-    return readWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
-  }
-
-  /**
    * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
    * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic}
    * along with {@link Deserializer}s for (optional) key and values.
@@ -401,40 +281,6 @@ public class KafkaIO {
         .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
         .build();
   }
-  /**
-   * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
-   * based on {@link Coder} instances.
-   */
-  @SuppressWarnings("unchecked")
-  public static <K, V> Write<K, V> writeWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
-    // Kafka constructs serializers directly. Pass coder through consumer
-    // configuration.
-    ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
-    Map<String, Object> config = builder
-            .putAll(Write.DEFAULT_PRODUCER_PROPERTIES)
-            .put(CoderBasedKafkaSerializer.configForKeySerializer(), keyCoder)
-            .put(CoderBasedKafkaSerializer.configForValueSerializer(), valueCoder)
-            .build();
-
-    CoderBasedKafkaSerializer<K> keySerializer = new CoderBasedKafkaSerializer<K>();
-    CoderBasedKafkaSerializer<V> valueSerializer = new CoderBasedKafkaSerializer<V>();
-
-    return new AutoValue_KafkaIO_Write.Builder<K, V>()
-            .setProducerConfig(config)
-            .setKeySerializer((Class) CoderBasedKafkaSerializer.class)
-            .setValueSerializer((Class) CoderBasedKafkaSerializer.class)
-            .build();
-  }
-
-  /**
-   * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
-   * based on {@link AvroCoder}. The coder writes Avro data directly without using an Avro object
-   * container.
-   */
-  @SuppressWarnings("unchecked")
-  public static <K, V> Write<K, V> toAvro(Class<K> keyClass, Class<V> valueClass) {
-    return writeWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
-  }
 
   ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
 
@@ -483,7 +329,7 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+     * Sets the bootstrap servers for the Kafka consumer.
      */
     public Read<K, V> withBootstrapServers(String bootstrapServers) {
       return updateConsumerProperties(
@@ -492,8 +338,9 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the topic.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     * Sets the topic to read from.
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopic(String topic) {
@@ -501,9 +348,10 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the topics. All the partitions from each
+     * Sets a list of topics to read from. All the partitions from each
      * of the topics are read.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopics(List<String> topics) {
@@ -513,9 +361,10 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
+     * Sets a list of partitions to read from. This allows reading only a subset
      * of partitions for one or more topics when (if ever) needed.
-     * See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
+     *
+     * <p>See {@link UnboundedKafkaSource#split(int, PipelineOptions)} for description
      * of how the partitions are distributed among the splits.
      */
     public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
@@ -524,31 +373,51 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} with a Kafka {@link Deserializer} for key bytes.
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,
+     * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
      */
     public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
       return toBuilder().setKeyDeserializer(keyDeserializer).build();
     }
 
     /**
-     * Returns a new {@link Read} with a {@link Coder} for the key.
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
      */
-    public Read<K, V> withKeyCoder(Coder<K> keyCoder) {
-      return toBuilder().setKeyCoder(keyCoder).build();
+    public Read<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return toBuilder().setKeyDeserializer(keyDeserializer).setKeyCoder(keyCoder).build();
     }
 
     /**
-     * Returns a new {@link Read} with a Kafka {@link Deserializer} for value bytes.
+     * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and deserialize value objects at
+     * runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}
+     * class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,
+     * Coder)} to provide the value coder explicitly.
      */
     public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
       return toBuilder().setValueDeserializer(valueDeserializer).build();
     }
 
     /**
-     * Returns a new {@link Read} with a {@link Coder} for values.
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
      */
-    public Read<K, V> withValueCoder(Coder<V> valueCoder) {
-      return toBuilder().setValueCoder(valueCoder).build();
+    public Read<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {
+      return toBuilder().setValueDeserializer(valueDeserializer).setValueCoder(valueCoder).build();
     }
 
     /**
@@ -649,20 +518,23 @@ public class KafkaIO {
       Coder<K> keyCoder =
           checkNotNull(
               getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
-              "Key coder must be inferable from input or set using readWithCoders");
+              "Key coder could not be inferred from key deserializer. Please provide"
+                  + "key coder explicitly using withKeyDeserializerAndCoder()");
 
       Coder<V> valueCoder =
           checkNotNull(
-              getValueCoder() != null
-                  ? getValueCoder()
+              getValueCoder() != null ? getValueCoder()
                   : inferCoder(registry, getValueDeserializer()),
-              "Value coder must be inferable from input or set using readWithCoders");
+              "Value coder could not be inferred from value deserializer. Please provide"
+                  + "value coder explicitly using withValueDeserializerAndCoder()");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
-          org.apache.beam.sdk.io.Read.from(this
-                  .withKeyCoder(keyCoder)
-                  .withValueCoder(valueCoder)
+          org.apache.beam.sdk.io.Read.from(
+              toBuilder()
+                  .setKeyCoder(keyCoder)
+                  .setValueCoder(valueCoder)
+                  .build()
                   .makeSource());
 
       PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
@@ -683,6 +555,7 @@ public class KafkaIO {
      */
     @VisibleForTesting
     UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
+
       return new UnboundedKafkaSource<K, V>(this, -1);
     }
 
@@ -703,11 +576,9 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
         ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
         // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
         //     lets allow these, applications can have better resume point for restarts.
-        CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use readWithCoders instead",
-        CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use readWithCoders instead"
         );
 
     // set config defaults
@@ -756,7 +627,9 @@ public class KafkaIO {
       for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {
         String key = conf.getKey();
         if (!ignoredConsumerPropertiesKeys.contains(key)) {
-          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+          Object value = DisplayData.inferType(conf.getValue()) != null
+              ? conf.getValue() : String.valueOf(conf.getValue());
+          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
         }
       }
     }
@@ -1475,15 +1348,16 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} transform that writes to given topic.
+     * Sets the Kafka topic to write to.
      */
     public Write<K, V> withTopic(String topic) {
       return toBuilder().setTopic(topic).build();
     }
 
     /**
-     * Returns a new {@link Write} with {@link Serializer} for serializing key (if any) to bytes.
-     * A key is optional while writing to Kafka. Note when a key is set, its hash is used to
+     * Sets a {@link Serializer} for serializing key (if any) to bytes.
+     *
+     * <p>A key is optional while writing to Kafka. Note when a key is set, its hash is used to
      * determine partition in Kafka (see {@link ProducerRecord} for more details).
      */
     public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
@@ -1491,12 +1365,15 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} with {@link Serializer} for serializing value to bytes.
+     * Sets a {@link Serializer} for serializing value to bytes.
      */
     public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
       return toBuilder().setValueSerializer(valueSerializer).build();
     }
 
+    /**
+     * Adds the given producer properties, overriding old values of properties with the same key.
+     */
     public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
       Map<String, Object> config = updateKafkaProperties(getProducerConfig(),
           IGNORED_PRODUCER_PROPERTIES, configUpdates);
@@ -1504,7 +1381,7 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} with a custom function to create Kafka producer. Primarily used
+     * Sets a custom function to create Kafka producer. Primarily used
      * for tests. Default is {@link KafkaProducer}
      */
     public Write<K, V> withProducerFactoryFn(
@@ -1513,8 +1390,8 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new transform that writes just the values to Kafka. This is useful for writing
-     * collections of values rather thank {@link KV}s.
+     * Writes just the values to Kafka. This is useful for writing collections of values rather
+     * thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
       return new KafkaValueWrite<>(toBuilder().build());
@@ -1543,10 +1420,7 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
-        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead",
-
-        CoderBasedKafkaSerializer.configForKeySerializer(), "Use writeWithCoders instead",
-        CoderBasedKafkaSerializer.configForValueSerializer(), "Use writeWithCoders instead"
+        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead"
      );
 
     @Override
@@ -1557,7 +1431,9 @@ public class KafkaIO {
       for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {
         String key = conf.getKey();
         if (!ignoredProducerPropertiesKeys.contains(key)) {
-          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(conf.getValue())));
+          Object value = DisplayData.inferType(conf.getValue()) != null
+              ? conf.getValue() : String.valueOf(conf.getValue());
+          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));
         }
       }
     }
@@ -1658,14 +1534,6 @@ public class KafkaIO {
     KafkaWriter(Write<K, V> spec) {
       this.spec = spec;
 
-      // Set custom kafka serializers. We do not want to serialize user objects then pass the bytes
-      // to producer since key and value objects are used in Kafka Partitioner interface.
-      // This does not matter for default partitioner in Kafka as it uses just the serialized
-      // key bytes to pick a partition. But we don't want to limit use of custom partitions.
-      // We pass key and values objects the user writes directly Kafka and user supplied
-      // coders to serialize them are invoked inside CoderBasedKafkaSerializer.
-      // Use case : write all the events for a single session to same Kafka partition.
-
       this.producerConfig = new HashMap<>(spec.getProducerConfig());
 
       this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -1708,4 +1576,43 @@ public class KafkaIO {
       }
     }
   }
+
+  /**
+   * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
+   * deserializer argument using the {@link Coder} registry.
+   */
+  @VisibleForTesting
+  static <T> NullableCoder<T> inferCoder(
+      CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
+    checkNotNull(deserializer);
+
+    for (Type type : deserializer.getGenericInterfaces()) {
+      if (!(type instanceof ParameterizedType)) {
+        continue;
+      }
+
+      // This does not recurse: we will not infer from a class that extends
+      // a class that extends Deserializer<T>.
+      ParameterizedType parameterizedType = (ParameterizedType) type;
+
+      if (parameterizedType.getRawType() == Deserializer.class) {
+        Type parameter = parameterizedType.getActualTypeArguments()[0];
+
+        @SuppressWarnings("unchecked")
+        Class<T> clazz = (Class<T>) parameter;
+
+        try {
+          return NullableCoder.of(coderRegistry.getCoder(clazz));
+        } catch (CannotProvideCoderException e) {
+          throw new RuntimeException(
+              String.format("Unable to automatically infer a Coder for "
+                                + "the Kafka Deserializer %s: no coder registered for type %s",
+                            deserializer, clazz));
+        }
+      }
+    }
+
+    throw new RuntimeException(String.format(
+        "Could not extract the Kafka Deserializer type from %s", deserializer));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2ee22fa1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
deleted file mode 100644
index a165586..0000000
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.kafka.serialization;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.kafka.common.serialization.Deserializer;
-
-/**
- * Implements a Kafka {@link Deserializer} with a {@link Coder}.
- *
- * <p>As Kafka instantiates serializers directly, the coder must be stored as serialized value in
- * the producer configuration map.
- */
-public class CoderBasedKafkaDeserializer<T> implements Deserializer<T> {
-  @SuppressWarnings("unchecked")
-  @Override
-  public void configure(Map<String, ?> configs, boolean isKey) {
-    String configKey = isKey ? configForKeyDeserializer() : configForValueDeserializer();
-    coder = (Coder<T>) configs.get(configKey);
-    checkNotNull(coder, "could not instantiate coder for Kafka deserialization");
-  }
-
-  @Override
-  public T deserialize(String topic, byte[] data) {
-    if (data == null) {
-      return null;
-    }
-
-    try {
-      return CoderUtils.decodeFromByteArray(coder, data);
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() {}
-
-  public static String configForKeyDeserializer() {
-    return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "key");
-  }
-
-  public static String configForValueDeserializer() {
-    return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "value");
-  }
-
-  private Coder<T> coder = null;
-  private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.deserializer";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/2ee22fa1/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
deleted file mode 100644
index 84b617e..0000000
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.kafka.serialization;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.kafka.common.serialization.Serializer;
-
-/**
- * Implements Kafka's {@link Serializer} with a {@link Coder}.
- *
- * <p>As Kafka instantiates serializers directly, the coder
- * must be stored as serialized value in the producer configuration map.
- */
-public class CoderBasedKafkaSerializer<T> implements Serializer<T> {
-  @SuppressWarnings("unchecked")
-  @Override
-  public void configure(Map<String, ?> configs, boolean isKey) {
-    String configKey = isKey ? configForKeySerializer() : configForValueSerializer();
-    coder = (Coder<T>) configs.get(configKey);
-    checkNotNull(coder, "could not instantiate coder for Kafka serialization");
-  }
-
-  @Override
-  public byte[] serialize(String topic, @Nullable T data) {
-    if (data == null) {
-      return null; // common for keys to be null
-    }
-
-    try {
-      return CoderUtils.encodeToByteArray(coder, data);
-    } catch (CoderException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() {
-  }
-
-  public static String configForKeySerializer() {
-    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key");
-  }
-
-  public static String configForValueSerializer() {
-    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value");
-  }
-
-  private Coder<T> coder = null;
-  private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer";
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/2ee22fa1/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index ccbd3d6..12b7c78 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -43,10 +43,11 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -264,35 +265,6 @@ public class KafkaIOTest {
     }
   }
 
-  /**
-   * Creates a consumer with two topics, with 10 partitions each.
-   * numElements are (round-robin) assigned all the 20 partitions.
-   * Coders are specified explicitly.
-   */
-  private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
-          int numElements,
-          @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
-
-    List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
-    KafkaIO.Read<Integer, Long> reader = KafkaIO
-            .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of())
-            .withBootstrapServers("myServer1:9092,myServer2:9092")
-            .withTopics(topics)
-            .withConsumerFactoryFn(new ConsumerFactoryFn(
-                    topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
-            .withKeyDeserializer(IntegerDeserializer.class)
-            .withValueDeserializer(LongDeserializer.class)
-            .withMaxNumRecords(numElements);
-
-    if (timestampFn != null) {
-      return reader.withTimestampFn(timestampFn);
-    } else {
-      return reader;
-    }
-  }
-
-
   private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
     private final int num;
 
@@ -343,19 +315,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testUnboundedSourceWithCoders() {
-    int numElements = 1000;
-
-    PCollection<Long> input = p
-            .apply(mkKafkaReadTransformWithCoders(numElements, new ValueAsTimestampFn())
-                    .withoutMetadata())
-            .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 
@@ -454,9 +413,9 @@ public class KafkaIOTest {
     // is used in the test.
     UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
         mkKafkaReadTransform(numElements, null)
-                .withKeyCoder(VarIntCoder.of())
-                .withValueCoder(VarLongCoder.of())
-                .makeSource();
+            .withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of())
+            .withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of())
+            .makeSource();
 
     List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
         initial.split(numSplits, p.getOptions());
@@ -717,39 +676,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testSinkWithCoders() throws Exception {
-    // Simply read from kafka source and write to kafka sink. Then verify the records
-    // are correctly published to mock kafka producer.
-
-    int numElements = 1000;
-
-    synchronized (MOCK_PRODUCER_LOCK) {
-
-      MOCK_PRODUCER.clear();
-
-      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
-
-      String topic = "test";
-
-      p
-              .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-                      .withoutMetadata())
-              .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
-                      .withBootstrapServers("none")
-                      .withTopic(topic)
-                      .withKeySerializer(IntegerSerializer.class)
-                      .withValueSerializer(LongSerializer.class)
-                      .withProducerFactoryFn(new ProducerFactoryFn()));
-
-      p.run();
-
-      completionThread.shutdown();
-
-      verifyProducerRecords(topic, numElements, false);
-    }
-  }
-
-  @Test
   public void testValuesSink() throws Exception {
     // similar to testSink(), but use values()' interface.
 
@@ -840,19 +766,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testSourceDisplayDataWithCoders() {
-    KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, null);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
-    assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
-    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
-    assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
-    assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
-  }
-
-  @Test
   public void testSourceWithExplicitPartitionsDisplayData() {
     KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
         .withBootstrapServers("myServer1:9092,myServer2:9092")
@@ -886,21 +799,6 @@ public class KafkaIOTest {
     assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
     assertThat(displayData, hasDisplayItem("retries", 3));
   }
-  @Test
-  public void testSinkDisplayDataWithCoders() {
-    KafkaIO.Write<Integer, Long> write = KafkaIO
-            .<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
-            .withBootstrapServers("myServerA:9092,myServerB:9092")
-            .withTopic("myTopic")
-            .withValueSerializer(LongSerializer.class)
-            .withProducerFactoryFn(new ProducerFactoryFn());
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
-    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
-    assertThat(displayData, hasDisplayItem("retries", 3));
-  }
 
   // interface for testing coder inference
   private interface DummyInterface<T> {


[2/2] beam git commit: This closes #3000: Cherry-pick #2986 into release-2.0.0

Posted by jk...@apache.org.
This closes #3000: Cherry-pick #2986 into release-2.0.0


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df2cdd89
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df2cdd89
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df2cdd89

Branch: refs/heads/release-2.0.0
Commit: df2cdd8990d28da5d86381d6833c94db405f3e65
Parents: 9f7521f 2ee22fa
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 9 13:22:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 9 13:22:35 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 321 +++++++------------
 .../CoderBasedKafkaDeserializer.java            |  70 ----
 .../CoderBasedKafkaSerializer.java              |  72 -----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 112 +------
 4 files changed, 119 insertions(+), 456 deletions(-)
----------------------------------------------------------------------