You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/26 21:38:27 UTC
[1/2] beam git commit: [BEAM-1573] Use Kafka serializers instead of
coders in KafkaIO
Repository: beam
Updated Branches:
refs/heads/master a8d76603b -> af8ead44e
[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d841e5db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d841e5db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d841e5db
Branch: refs/heads/master
Commit: d841e5dbb3b23d9eb81f7a380daf401bad3367be
Parents: a8d7660
Author: peay <pe...@protonmail.com>
Authored: Sun Mar 26 10:51:59 2017 -0400
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 26 14:35:43 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 10 +-
.../ResumeFromCheckpointStreamingTest.java | 22 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 394 +++++++++++++------
.../CoderBasedKafkaDeserializer.java | 71 ++++
.../CoderBasedKafkaSerializer.java | 73 ++++
.../serialization/InstantDeserializer.java | 45 +++
.../kafka/serialization/InstantSerializer.java | 45 +++
.../io/kafka/serialization/package-info.java | 22 ++
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 119 ++++--
9 files changed, 640 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 905b30e..ff43fa6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -44,6 +44,8 @@ import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
@@ -118,14 +120,14 @@ public class SparkRunnerDebuggerTest {
KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
.withBootstrapServers("mykafka:9092")
.withTopics(Collections.singletonList("my_input_topic"))
- .withKeyCoder(StringUtf8Coder.of())
- .withValueCoder(StringUtf8Coder.of());
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(StringDeserializer.class);
KafkaIO.Write<String, String> write = KafkaIO.<String, String>write()
.withBootstrapServers("myotherkafka:9092")
.withTopic("my_output_topic")
- .withKeyCoder(StringUtf8Coder.of())
- .withValueCoder(StringUtf8Coder.of());
+ .withKeySerializer(StringSerializer.class)
+ .withValueSerializer(StringSerializer.class);
KvCoder<String, String> stringKvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 1aa76a3..7d7fd08 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -39,15 +39,15 @@ import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.UsesCheckpointRecovery;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
+import org.apache.beam.sdk.io.kafka.serialization.InstantSerializer;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -119,18 +120,7 @@ public class ResumeFromCheckpointStreamingTest {
producerProps.put("request.required.acks", 1);
producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
Serializer<String> stringSerializer = new StringSerializer();
- Serializer<Instant> instantSerializer = new Serializer<Instant>() {
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) { }
-
- @Override
- public byte[] serialize(String topic, Instant data) {
- return CoderHelpers.toByteArray(data, InstantCoder.of());
- }
-
- @Override
- public void close() { }
- };
+ Serializer<Instant> instantSerializer = new InstantSerializer();
try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer =
new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
@@ -232,8 +222,8 @@ public class ResumeFromCheckpointStreamingTest {
KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
.withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
.withTopics(Collections.singletonList(TOPIC))
- .withKeyCoder(StringUtf8Coder.of())
- .withValueCoder(InstantCoder.of())
+ .withKeyDeserializer(StringDeserializer.class)
+ .withValueDeserializer(InstantDeserializer.class)
.updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
.withTimestampFn(new SerializableFunction<KV<String, Instant>, Instant>() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 68efb9a..a0977b7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,6 +34,8 @@ import com.google.common.io.Closeables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -54,9 +56,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
@@ -64,6 +66,8 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
+import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
+import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
@@ -73,8 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -94,6 +96,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -116,9 +119,8 @@ import org.slf4j.LoggerFactory;
* <p>Although most applications consume a single topic, the source can be configured to consume
* multiple topics or even a specific set of {@link TopicPartition}s.
*
- * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
- * and one or more topics to consume. The following example illustrates various options for
- * configuring the source :
+ * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
+ * one or more topics to consume, and key and value deserializers. For example:
*
* <pre>{@code
*
@@ -126,9 +128,9 @@ import org.slf4j.LoggerFactory;
* .apply(KafkaIO.<Long, String>read()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
- * // set a Coder for Key and Value
- * .withKeyCoder(BigEndianLongCoder.of())
- * .withValueCoder(StringUtf8Coder.of())
+ * .withKeyDeserializer(LongDeserializer.class)
+ * .withValueDeserializer(StringDeserializer.class)
+ *
* // above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
*
* // rest of the settings are optional :
@@ -150,6 +152,51 @@ import org.slf4j.LoggerFactory;
* ...
* }</pre>
*
+ * <p>Kafka provides deserializers for common types in
+ * {@link org.apache.kafka.common.serialization}.
+ *
+ * <p>To read Avro data, {@code fromAvro} can be used. This does not require manually specifying
+ * a {@link Coder} or {@link Deserializer}.
+ *
+ * <p>It's also possible to deserialize data using a Beam {@link Coder} via
+ * {@link #readWithCoders(Coder, Coder)}, though this is discouraged because the particular
+ * binary format is not guaranteed by coders. However, this can be useful
+ * when exchanging data with a Beam pipeline that uses the same coder:
+ *
+ * <pre>{@code
+ *
+ * pipeline
+ * .apply(KafkaIO.<MyKey, MyValue>readWithCoders(MyKeyCoder.of(), MyValueCoder.of())
+ * .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopic("my_topic")
+ * )
+ * ...
+ * }</pre>
+ *
+ * <p>In most cases, you don't need to specify {@link Coder} for key and value in the resulting
+ * collection because the coders are inferred from deserializer types. However, in cases when
+ * coder inference fails, they can be specified manually using {@link Read#withKeyCoder} and
+ * {@link Read#withValueCoder}. Note that the payloads of Kafka messages is interpreted using
+ * key and value <i>deserializers</i>; coders are a Beam implementation detail to help runners
+ * materialize the data for intermediate storage if necessary.
+ *
+ * <pre>{@code
+ *
+ * pipeline
+ * .apply(KafkaIO.<Long, Foo>read()
+ * .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withTopic("my_topic")
+ *
+ * // infer coder from deserializer
+ * .withKeyDeserializer(LongDeserializer.class)
+ *
+ * // explicitly specify coder
+ * .withValueDeserializer(FooDeserializer.class)
+ * .withValueCoder(FooCoder.of())
+ * )
+ * ...
+ * }</pre>
+ *
* <h3>Partition Assignment and Checkpointing</h3>
* The Kafka partitions are evenly distributed among splits (workers).
* Checkpointing is fully supported and each split can resume from previous checkpoint. See
@@ -165,8 +212,7 @@ import org.slf4j.LoggerFactory;
*
* <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write
* just the values. To configure a Kafka sink, you must specify at the minimum Kafka
- * <tt>bootstrapServers</tt> and the topic to write to. The following example illustrates various
- * options for configuring the sink:
+ * <tt>bootstrapServers</tt>, the topic to write to, and key and value serializers. For example:
*
* <pre>{@code
*
@@ -175,9 +221,8 @@ import org.slf4j.LoggerFactory;
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
*
- * // set Coder for Key and Value
- * .withKeyCoder(BigEndianLongCoder.of())
- * .withValueCoder(StringUtf8Coder.of())
+ * .withKeySerializer(LongSerializer.class)
+ * .withValueSerializer(StringSerializer.class)
*
* // you can further customize KafkaProducer used to write the records by adding more
* // settings for ProducerConfig. e.g, to enable compression :
@@ -193,11 +238,16 @@ import org.slf4j.LoggerFactory;
* strings.apply(KafkaIO.<Void, String>write()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("results")
- * .withValueCoder(StringUtf8Coder.of()) // just need coder for value
+ * .withValueSerializer(new StringSerializer()) // just need serializer for value
* .values()
* );
* }</pre>
*
+ * <p>Same notes on coders vs. serializers apply as above for {@link Read}.
+ *
+ * <p>To write Avro data, {@code toAvro} can be used. This does not require specifying serializers
+ * or coders.
+ *
* <h3>Advanced Kafka Configuration</h3>
* KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in
* {@link ProducerConfig} for sink. E.g. if you would like to enable offset
@@ -214,18 +264,53 @@ import org.slf4j.LoggerFactory;
*/
@Experimental
public class KafkaIO {
+
+ /**
+ * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
+ * deserializer argument using the {@link Coder} registry.
+ */
+ @VisibleForTesting
+ static <T> Coder<T> inferCoder(
+ CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
+ checkNotNull(deserializer);
+
+ for (Type type : deserializer.getGenericInterfaces()) {
+ if (!(type instanceof ParameterizedType)) {
+ continue;
+ }
+
+ // This does not recurse: we will not infer from a class that extends
+ // a class that extends Deserializer<T>.
+ ParameterizedType parameterizedType = (ParameterizedType) type;
+
+ if (parameterizedType.getRawType() == Deserializer.class) {
+ Type parameter = parameterizedType.getActualTypeArguments()[0];
+
+ try {
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) parameter;
+ return coderRegistry.getDefaultCoder(clazz);
+ } catch (CannotProvideCoderException e) {
+ LOG.warn("Could not infer coder from deserializer type", e);
+ }
+ }
+ }
+
+ throw new RuntimeException("Could not extract deserializer type from " + deserializer);
+ }
+
/**
* Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
* configuration should set with {@link Read#withBootstrapServers(String)} and
- * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
- * custom timestamp and watermark functions.
+ * {@link Read#withTopics(List)}. Other optional settings include key and value
+ * {@link Deserializer}s, custom timestamp and watermark functions.
*/
public static Read<byte[], byte[]> readBytes() {
return new AutoValue_KafkaIO_Read.Builder<byte[], byte[]>()
.setTopics(new ArrayList<String>())
.setTopicPartitions(new ArrayList<TopicPartition>())
- .setKeyCoder(ByteArrayCoder.of())
- .setValueCoder(ByteArrayCoder.of())
+ .setKeyDeserializer(ByteArrayDeserializer.class)
+ .setValueDeserializer(ByteArrayDeserializer.class)
.setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
.setMaxNumRecords(Long.MAX_VALUE)
@@ -235,8 +320,8 @@ public class KafkaIO {
/**
* Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
* configuration should set with {@link Read#withBootstrapServers(String)} and
- * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
- * custom timestamp and watermark functions.
+ * {@link Read#withTopics(List)}. Other optional settings include key and value
+ * {@link Deserializer}s, custom timestamp and watermark functions.
*/
public static <K, V> Read<K, V> read() {
return new AutoValue_KafkaIO_Read.Builder<K, V>()
@@ -249,15 +334,87 @@ public class KafkaIO {
}
/**
+ * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
+ * based on {@link Coder} instances.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> Read<K, V> readWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
+ // Kafka constructs deserializers directly. Pass coder through consumer
+ // configuration.
+ ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
+ Map<String, Object> config = builder
+ .putAll(Read.DEFAULT_CONSUMER_PROPERTIES)
+ .put(CoderBasedKafkaDeserializer.configForKeyDeserializer(), keyCoder)
+ .put(CoderBasedKafkaDeserializer.configForValueDeserializer(), valueCoder)
+ .build();
+
+ return new AutoValue_KafkaIO_Read.Builder<K, V>()
+ .setTopics(new ArrayList<String>())
+ .setTopicPartitions(new ArrayList<TopicPartition>())
+ .setKeyCoder(keyCoder)
+ .setValueCoder(valueCoder)
+ .setKeyDeserializer((Class) CoderBasedKafkaDeserializer.class)
+ .setValueDeserializer((Class) CoderBasedKafkaDeserializer.class)
+ .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
+ .setConsumerConfig(config)
+ .setMaxNumRecords(Long.MAX_VALUE)
+ .build();
+ }
+
+ /**
+ * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
+ * based on {@link AvroCoder}. This reads data in the Avro binary format directly without using
+ * an Avro object container.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> Read<K, V> fromAvro(Class<K> keyClass, Class<V> valueClass) {
+ return readWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
+ }
+
+ /**
* Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
* should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic}
- * along with {@link Coder}s for (optional) key and values.
+ * along with {@link Deserializer}s for (optional) key and values.
*/
public static <K, V> Write<K, V> write() {
return new AutoValue_KafkaIO_Write.Builder<K, V>()
.setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
.build();
}
+ /**
+ * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
+ * based on {@link Coder} instances.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> Write<K, V> writeWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
+ // Kafka constructs serializers directly. Pass coder through consumer
+ // configuration.
+ ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
+ Map<String, Object> config = builder
+ .putAll(Write.DEFAULT_PRODUCER_PROPERTIES)
+ .put(CoderBasedKafkaSerializer.configForKeySerializer(), keyCoder)
+ .put(CoderBasedKafkaSerializer.configForValueSerializer(), valueCoder)
+ .build();
+
+ CoderBasedKafkaSerializer<K> keySerializer = new CoderBasedKafkaSerializer<K>();
+ CoderBasedKafkaSerializer<V> valueSerializer = new CoderBasedKafkaSerializer<V>();
+
+ return new AutoValue_KafkaIO_Write.Builder<K, V>()
+ .setProducerConfig(config)
+ .setKeySerializer((Class) CoderBasedKafkaSerializer.class)
+ .setValueSerializer((Class) CoderBasedKafkaSerializer.class)
+ .build();
+ }
+
+ /**
+ * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
+ * based on {@link AvroCoder}. The coder writes Avro data directly without using an Avro object
+ * container.
+ */
+ @SuppressWarnings("unchecked")
+ public static <K, V> Write<K, V> toAvro(Class<K> keyClass, Class<V> valueClass) {
+ return writeWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
+ }
///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
@@ -273,6 +430,8 @@ public class KafkaIO {
abstract List<TopicPartition> getTopicPartitions();
@Nullable abstract Coder<K> getKeyCoder();
@Nullable abstract Coder<V> getValueCoder();
+ @Nullable abstract Class<? extends Deserializer<K>> getKeyDeserializer();
+ @Nullable abstract Class<? extends Deserializer<V>> getValueDeserializer();
abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
getConsumerFactoryFn();
@Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn();
@@ -290,6 +449,9 @@ public class KafkaIO {
abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);
abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+ abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer);
+ abstract Builder<K, V> setValueDeserializer(
+ Class<? extends Deserializer<V>> valueDeserializer);
abstract Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
@@ -342,14 +504,28 @@ public class KafkaIO {
}
/**
- * Returns a new {@link Read} with {@link Coder} for key bytes.
+ * Returns a new {@link Read} with a Kafka {@link Deserializer} for key bytes.
+ */
+ public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
+ return toBuilder().setKeyDeserializer(keyDeserializer).build();
+ }
+
+ /**
+ * Returns a new {@link Read} with a {@link Coder} for the key.
*/
public Read<K, V> withKeyCoder(Coder<K> keyCoder) {
return toBuilder().setKeyCoder(keyCoder).build();
}
/**
- * Returns a new {@link Read} with {@link Coder} for value bytes.
+ * Returns a new {@link Read} with a Kafka {@link Deserializer} for value bytes.
+ */
+ public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
+ return toBuilder().setValueDeserializer(valueDeserializer).build();
+ }
+
+ /**
+ * Returns a new {@link Read} with a {@link Coder} for values.
*/
public Read<K, V> withValueCoder(Coder<V> valueCoder) {
return toBuilder().setValueCoder(valueCoder).build();
@@ -436,20 +612,51 @@ public class KafkaIO {
}
@Override
- public void validate(PBegin input) {
+ public void validate(PBegin input) {
checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
"Kafka bootstrap servers should be set");
checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
"Kafka topics or topic_partitions are required");
- checkNotNull(getKeyCoder(), "Key coder must be set");
- checkNotNull(getValueCoder(), "Value coder must be set");
+ checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
+ checkNotNull(getValueDeserializer(), "Value deserializer must be set");
+
+ if (input != null) {
+ CoderRegistry registry = input.getPipeline().getCoderRegistry();
+
+ checkNotNull(getKeyCoder() != null
+ ? getKeyCoder()
+ : inferCoder(registry, getKeyDeserializer()),
+ "Key coder must be set");
+
+ checkNotNull(getValueCoder() != null
+ ? getValueCoder()
+ : inferCoder(registry, getValueDeserializer()),
+ "Value coder must be set");
+ } else {
+ checkNotNull(getKeyCoder(), "Key coder must be set");
+ checkNotNull(getValueCoder(), "Value coder must be set");
+ }
}
@Override
public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
- // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+ // Infer key/value coders if not specified explicitly
+ CoderRegistry registry = input.getPipeline().getCoderRegistry();
+
+ Coder<K> keyCoder = getKeyCoder() != null
+ ? getKeyCoder()
+ : inferCoder(registry, getKeyDeserializer());
+
+ Coder<V> valueCoder = getValueCoder() != null
+ ? getValueCoder()
+ : inferCoder(registry, getValueDeserializer());
+
+ // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
- org.apache.beam.sdk.io.Read.from(makeSource());
+ org.apache.beam.sdk.io.Read.from(this
+ .withKeyCoder(keyCoder)
+ .withValueCoder(valueCoder)
+ .makeSource());
PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
@@ -488,8 +695,8 @@ public class KafkaIO {
* A set of properties that are not required or don't make sense for our consumer.
*/
private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead"
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
// "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
// lets allow these, applications can have better resume point for restarts.
);
@@ -739,6 +946,9 @@ public class KafkaIO {
private Instant curTimestamp;
private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+ private Deserializer<K> keyDeserializerInstance = null;
+ private Deserializer<V> valueDeserializerInstance = null;
+
private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
@@ -912,6 +1122,16 @@ public class KafkaIO {
consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
+ try {
+ keyDeserializerInstance = source.spec.getKeyDeserializer().newInstance();
+ valueDeserializerInstance = source.spec.getValueDeserializer().newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IOException("Could not instantiate deserializers", e);
+ }
+
+ keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
+ valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
+
for (PartitionState p : partitionStates) {
if (p.nextOffset != UNINITIALIZED_OFFSET) {
consumer.seek(p.topicPartition, p.nextOffset);
@@ -1003,15 +1223,15 @@ public class KafkaIO {
curRecord = null; // user coders below might throw.
- // apply user coders. might want to allow skipping records that fail to decode.
- // TODO: wrap exceptions from coders to make explicit to users
+ // apply user deserializers.
+ // TODO: write records that can't be deserialized to a "dead-letter" additional output.
KafkaRecord<K, V> record = new KafkaRecord<K, V>(
rawRecord.topic(),
rawRecord.partition(),
rawRecord.offset(),
consumerSpEL.getRecordTimestamp(rawRecord),
- decode(rawRecord.key(), source.spec.getKeyCoder()),
- decode(rawRecord.value(), source.spec.getValueCoder()));
+ keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()),
+ valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value()));
curTimestamp = (source.spec.getTimestampFn() == null)
? Instant.now() : source.spec.getTimestampFn().apply(record);
@@ -1032,16 +1252,6 @@ public class KafkaIO {
}
}
- private static byte[] nullBytes = new byte[0];
- private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
- // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
- // This makes it impossible for user to distinguish between zero length byte and null.
- // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
- // coder.
- byte[] toDecode = bytes == null ? nullBytes : bytes;
- return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
- }
-
// update latest offset for each partition.
// called from offsetFetcher thread
private void updateLatestOffsets() {
@@ -1153,6 +1363,9 @@ public class KafkaIO {
}
}
+ Closeables.close(keyDeserializerInstance, true);
+ Closeables.close(valueDeserializerInstance, true);
+
Closeables.close(offsetConsumer, true);
Closeables.close(consumer, true);
}
@@ -1167,22 +1380,23 @@ public class KafkaIO {
@AutoValue
public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
@Nullable abstract String getTopic();
- @Nullable abstract Coder<K> getKeyCoder();
- @Nullable abstract Coder<V> getValueCoder();
abstract Map<String, Object> getProducerConfig();
@Nullable
abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
+ @Nullable abstract Class<? extends Serializer<K>> getKeySerializer();
+ @Nullable abstract Class<? extends Serializer<V>> getValueSerializer();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
abstract static class Builder<K, V> {
abstract Builder<K, V> setTopic(String topic);
- abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
- abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
abstract Builder<K, V> setProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
+ abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);
+ abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);
abstract Write<K, V> build();
}
@@ -1204,19 +1418,19 @@ public class KafkaIO {
}
/**
- * Returns a new {@link Write} with {@link Coder} for serializing key (if any) to bytes.
+ * Returns a new {@link Write} with {@link Serializer} for serializing key (if any) to bytes.
* A key is optional while writing to Kafka. Note when a key is set, its hash is used to
* determine partition in Kafka (see {@link ProducerRecord} for more details).
*/
- public Write<K, V> withKeyCoder(Coder<K> keyCoder) {
- return toBuilder().setKeyCoder(keyCoder).build();
+ public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
+ return toBuilder().setKeySerializer(keySerializer).build();
}
/**
- * Returns a new {@link Write} with {@link Coder} for serializing value to bytes.
+ * Returns a new {@link Write} with {@link Serializer} for serializing value to bytes.
*/
- public Write<K, V> withValueCoder(Coder<V> valueCoder) {
- return toBuilder().setValueCoder(valueCoder).build();
+ public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
+ return toBuilder().setValueSerializer(valueSerializer).build();
}
public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
@@ -1239,7 +1453,7 @@ public class KafkaIO {
* collections of values rather thank {@link KV}s.
*/
public PTransform<PCollection<V>, PDone> values() {
- return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
+ return new KafkaValueWrite<>(toBuilder().build());
}
@Override
@@ -1253,26 +1467,19 @@ public class KafkaIO {
checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"Kafka bootstrap servers should be set");
checkNotNull(getTopic(), "Kafka topic should be set");
- checkNotNull(getKeyCoder(), "Key coder should be set");
- checkNotNull(getValueCoder(), "Value coder should be set");
}
// set config defaults
private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
ImmutableMap.<String, Object>of(
- ProducerConfig.RETRIES_CONFIG, 3,
- // See comment about custom serializers in KafkaWriter constructor.
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);
+ ProducerConfig.RETRIES_CONFIG, 3);
/**
* A set of properties that are not required or don't make sense for our producer.
*/
private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Set valueCoder instead",
- configForKeySerializer(), "Reserved for internal serializer",
- configForValueSerializer(), "Reserved for internal serializer"
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead"
);
@Override
@@ -1310,7 +1517,7 @@ public class KafkaIO {
return KV.of(null, element);
}
}))
- .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder()))
+ .setCoder(KvCoder.of(new NullOnlyCoder<K>(), input.getCoder()))
.apply(kvWriteTransform);
}
@@ -1389,8 +1596,11 @@ public class KafkaIO {
// Use case : write all the events for a single session to same Kafka partition.
this.producerConfig = new HashMap<>(spec.getProducerConfig());
- this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());
- this.producerConfig.put(configForValueSerializer(), spec.getValueCoder());
+
+ this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ spec.getKeySerializer());
+ this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ spec.getValueSerializer());
}
private synchronized void checkForFailures() throws IOException {
@@ -1427,48 +1637,4 @@ public class KafkaIO {
}
}
}
-
- /**
- * Implements Kafka's {@link Serializer} with a {@link Coder}. The coder is stored as serialized
- * value in producer configuration map.
- */
- public static class CoderBasedKafkaSerializer<T> implements Serializer<T> {
-
- @SuppressWarnings("unchecked")
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- String configKey = isKey ? configForKeySerializer() : configForValueSerializer();
- coder = (Coder<T>) configs.get(configKey);
- checkNotNull(coder, "could not instantiate coder for Kafka serialization");
- }
-
- @Override
- public byte[] serialize(String topic, @Nullable T data) {
- if (data == null) {
- return null; // common for keys to be null
- }
-
- try {
- return CoderUtils.encodeToByteArray(coder, data);
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- }
-
- private Coder<T> coder = null;
- private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer";
- }
-
-
- private static String configForKeySerializer() {
- return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key");
- }
-
- private static String configForValueSerializer() {
- return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value");
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
new file mode 100644
index 0000000..ca552fb
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Implements a Kafka {@link Deserializer} with a {@link Coder}.
+ *
+ * <p>As Kafka instantiates serializers directly, the coder must be stored as serialized value in
+ * the producer configuration map.
+ */
+public class CoderBasedKafkaDeserializer<T> implements Deserializer<T> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ String configKey = isKey ? configForKeyDeserializer() : configForValueDeserializer();
+ coder = (Coder<T>) configs.get(configKey);
+ checkNotNull(coder, "could not instantiate coder for Kafka deserialization");
+ }
+
+ @Override
+ public T deserialize(String topic, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ try {
+ return CoderUtils.decodeFromByteArray(coder, data);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ public static String configForKeyDeserializer() {
+ return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "key");
+ }
+
+ public static String configForValueDeserializer() {
+ return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "value");
+ }
+
+ private Coder<T> coder = null;
+ private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.deserializer";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
new file mode 100644
index 0000000..1044d6f
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Implements Kafka's {@link Serializer} with a {@link Coder}.
+ *
+ * <p>As Kafka instantiates serializers directly, the coder
+ * must be stored as serialized value in the producer configuration map.
+ */
+public class CoderBasedKafkaSerializer<T> implements Serializer<T> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ String configKey = isKey ? configForKeySerializer() : configForValueSerializer();
+ coder = (Coder<T>) configs.get(configKey);
+ checkNotNull(coder, "could not instantiate coder for Kafka serialization");
+ }
+
+ @Override
+ public byte[] serialize(String topic, @Nullable T data) {
+ if (data == null) {
+ return null; // common for keys to be null
+ }
+
+ try {
+ return CoderUtils.encodeToByteArray(coder, data);
+ } catch (CoderException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ public static String configForKeySerializer() {
+ return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key");
+ }
+
+ public static String configForValueSerializer() {
+ return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value");
+ }
+
+ private Coder<T> coder = null;
+ private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer";
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
new file mode 100644
index 0000000..fe4749f
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.joda.time.Instant;
+
+/**
+ * Kafka {@link Deserializer} for {@link Instant}.
+ *
+ * <p>This decodes the number of milliseconds since epoch using {@link LongDeserializer}.
+ */
+public class InstantDeserializer implements Deserializer<Instant> {
+ private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {}
+
+ @Override
+ public Instant deserialize(String topic, byte[] bytes) {
+ return new Instant(LONG_DESERIALIZER.deserialize(topic, bytes));
+ }
+
+ @Override
+ public void close() {}
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
new file mode 100644
index 0000000..8fa4429
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.joda.time.Instant;
+
+/**
+ * Kafka {@link Serializer} for {@link Instant}.
+ *
+ * <p>This encodes the number of milliseconds since epoch using {@link LongSerializer}.
+ */
+public class InstantSerializer implements Serializer<Instant> {
+ private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {}
+
+ @Override
+ public byte[] serialize(String topic, Instant instant) {
+ return LONG_SERIALIZER.serialize(topic, instant.getMillis());
+ }
+
+ @Override
+ public void close() {}
+};
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
new file mode 100644
index 0000000..747d64c
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka serializers and deserializers.
+ */
+package org.apache.beam.sdk.io.kafka.serialization;
http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2b11162..e6ed2f7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -41,12 +41,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -74,7 +77,14 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
@@ -231,8 +241,8 @@ public class KafkaIOTest {
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(numElements);
if (timestampFn != null) {
@@ -303,9 +313,9 @@ public class KafkaIOTest {
.withTopic("my_topic")
.withConsumerFactoryFn(new ConsumerFactoryFn(
ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
- .withMaxNumRecords(numElements);
+ .withMaxNumRecords(numElements)
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class);
PCollection<Long> input = p
.apply(reader.withoutMetadata())
@@ -326,8 +336,8 @@ public class KafkaIOTest {
.withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions
- .withKeyCoder(ByteArrayCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
+ .withKeyDeserializer(ByteArrayDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(numElements / 10);
PCollection<Long> input = p
@@ -386,8 +396,14 @@ public class KafkaIOTest {
int numElements = 1000;
int numSplits = 10;
+ // Coders must be specified explicitly here due to the way the transform
+ // is used in the test.
UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
- mkKafkaReadTransform(numElements, null).makeSource();
+ mkKafkaReadTransform(numElements, null)
+ .withKeyCoder(VarIntCoder.of())
+ .withValueCoder(VarLongCoder.of())
+ .makeSource();
+
List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
initial.split(numSplits, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
@@ -510,8 +526,8 @@ public class KafkaIOTest {
.withTopics(topics)
.withConsumerFactoryFn(new ConsumerFactoryFn(
topics, 10, numElements, OffsetResetStrategy.LATEST))
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
.withMaxNumRecords(numElements)
.withTimestampFn(new ValueAsTimestampFn())
.makeSource()
@@ -554,8 +570,8 @@ public class KafkaIOTest {
.apply(KafkaIO.<Integer, Long>write()
.withBootstrapServers("none")
.withTopic(topic)
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn()));
p.run();
@@ -587,7 +603,7 @@ public class KafkaIOTest {
.apply(KafkaIO.<Integer, Long>write()
.withBootstrapServers("none")
.withTopic(topic)
- .withValueCoder(BigEndianLongCoder.of())
+ .withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn())
.values());
@@ -628,8 +644,8 @@ public class KafkaIOTest {
.apply(KafkaIO.<Integer, Long>write()
.withBootstrapServers("none")
.withTopic(topic)
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn()));
try {
@@ -664,8 +680,8 @@ public class KafkaIOTest {
new TopicPartition("test", 6)))
.withConsumerFactoryFn(new ConsumerFactoryFn(
Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
- .withKeyCoder(ByteArrayCoder.of())
- .withValueCoder(BigEndianLongCoder.of());
+ .withKeyDeserializer(ByteArrayDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class);
DisplayData displayData = DisplayData.from(read);
@@ -681,7 +697,7 @@ public class KafkaIOTest {
KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
.withBootstrapServers("myServerA:9092,myServerB:9092")
.withTopic("myTopic")
- .withValueCoder(BigEndianLongCoder.of())
+ .withValueSerializer(LongSerializer.class)
.withProducerFactoryFn(new ProducerFactoryFn());
DisplayData displayData = DisplayData.from(write);
@@ -691,6 +707,51 @@ public class KafkaIOTest {
assertThat(displayData, hasDisplayItem("retries", 3));
}
+ // interface for testing coder inference
+ private interface DummyInterface<T> {
+ }
+
+ // interface for testing coder inference
+ private interface DummyNonparametricInterface {
+ }
+
+ // class for testing coder inference
+ private static class DeserializerWithInterfaces
+ implements DummyInterface<String>, DummyNonparametricInterface,
+ Deserializer<Long> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ }
+
+ @Override
+ public Long deserialize(String topic, byte[] bytes) {
+ return 0L;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ @Test
+ public void testInferKeyCoder() {
+ CoderRegistry registry = CoderRegistry.createDefault();
+
+ assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class)
+ instanceof VarLongCoder);
+
+ assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class)
+ instanceof StringUtf8Coder);
+
+ assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class)
+ instanceof InstantCoder);
+
+ assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class)
+ instanceof VarLongCoder);
+ }
+
private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
// verify that appropriate messages are written to kafka
@@ -724,8 +785,8 @@ public class KafkaIOTest {
private static final MockProducer<Integer, Long> MOCK_PRODUCER =
new MockProducer<Integer, Long>(
false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
- new KafkaIO.CoderBasedKafkaSerializer<Integer>(),
- new KafkaIO.CoderBasedKafkaSerializer<Long>()) {
+ new IntegerSerializer(),
+ new LongSerializer()) {
// override flush() so that it does not complete all the waiting sends, giving a chance to
// ProducerCompletionThread to inject errors.
@@ -754,10 +815,14 @@ public class KafkaIOTest {
public Producer<Integer, Long> apply(Map<String, Object> config) {
// Make sure the config is correctly set up for serializers.
- Utils.newInstance(
- ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
- .asSubclass(Serializer.class)
- ).configure(config, true);
+
+ // There may not be a key serializer if we're interested only in values.
+ if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
+ Utils.newInstance(
+ ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+ .asSubclass(Serializer.class)
+ ).configure(config, true);
+ }
Utils.newInstance(
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
[2/2] beam git commit: This closes #2330
Posted by jk...@apache.org.
This closes #2330
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af8ead44
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af8ead44
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af8ead44
Branch: refs/heads/master
Commit: af8ead44e8e8fe66068120740f8888f8ccd22603
Parents: a8d7660 d841e5d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Apr 26 14:35:51 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 26 14:35:51 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 10 +-
.../ResumeFromCheckpointStreamingTest.java | 22 +-
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 394 +++++++++++++------
.../CoderBasedKafkaDeserializer.java | 71 ++++
.../CoderBasedKafkaSerializer.java | 73 ++++
.../serialization/InstantDeserializer.java | 45 +++
.../kafka/serialization/InstantSerializer.java | 45 +++
.../io/kafka/serialization/package-info.java | 22 ++
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 119 ++++--
9 files changed, 640 insertions(+), 161 deletions(-)
----------------------------------------------------------------------