You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/24 13:23:37 UTC
flink git commit: [FLINK-3045] Properly expose the key of a Kafka
message
Repository: flink
Updated Branches:
refs/heads/master 89b1d2332 -> 85e7b2878
[FLINK-3045] Properly expose the key of a Kafka message
This closes #1385
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85e7b287
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85e7b287
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85e7b287
Branch: refs/heads/master
Commit: 85e7b2878eb50767ba64bbbd5794929d151f81ef
Parents: 89b1d23
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 19 15:41:01 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 24 13:23:07 2015 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaConsumer.java | 48 ++++--
.../connectors/kafka/FlinkKafkaConsumer082.java | 18 ++
.../connectors/kafka/FlinkKafkaProducer.java | 69 ++++++--
.../connectors/kafka/internals/Fetcher.java | 5 +-
.../kafka/internals/LegacyFetcher.java | 37 +++--
.../connectors/kafka/KafkaConsumerTestBase.java | 94 ++++++++++-
.../streaming/connectors/kafka/KafkaITCase.java | 5 +
.../KeyedDeserializationSchema.java | 52 ++++++
.../KeyedDeserializationSchemaWrapper.java | 51 ++++++
.../serialization/KeyedSerializationSchema.java | 48 ++++++
.../KeyedSerializationSchemaWrapper.java | 43 +++++
...eInformationKeyValueSerializationSchema.java | 163 +++++++++++++++++++
.../TypeInformationSerializationSchema.java | 2 +
13 files changed, 592 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e701639..446648f 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
@@ -207,7 +209,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
private final int[] partitions;
/** The schema to convert between Kafka#s byte messages, and Flink's objects */
- private final DeserializationSchema<T> valueDeserializer;
+ private final KeyedDeserializationSchema<T> deserializer;
// ------ Runtime State -------
@@ -234,9 +236,33 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
private transient long[] restoreToOffset;
private volatile boolean running = true;
-
+
// ------------------------------------------------------------------------
+
+ /**
+ * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+ *
+ * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+ * at the beginnign of this class.</p>
+ *
+ * @param topic
+ * The Kafka topic to read from.
+ * @param deserializer
+ * The deserializer to turn raw byte messages (without key) into Java/Scala objects.
+ * @param props
+ * The properties that are used to configure both the fetcher and the offset handler.
+ * @param offsetStore
+ * The type of offset store to use (Kafka / ZooKeeper)
+ * @param fetcherType
+ * The type of fetcher to use (new high-level API, old low-level API).
+ */
+ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> deserializer, Properties props,
+ OffsetStore offsetStore, FetcherType fetcherType) {
+ this(topic, new KeyedDeserializationSchemaWrapper<>(deserializer),
+ props, offsetStore, fetcherType);
+ }
+
/**
* Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
*
@@ -245,7 +271,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
*
* @param topic
* The Kafka topic to read from.
- * @param valueDeserializer
+ * @param deserializer
* The deserializer to turn raw byte messages into Java/Scala objects.
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
@@ -254,7 +280,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
* @param fetcherType
* The type of fetcher to use (new high-level API, old low-level API).
*/
- public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props,
+ public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props,
OffsetStore offsetStore, FetcherType fetcherType) {
this.offsetStore = checkNotNull(offsetStore);
this.fetcherType = checkNotNull(fetcherType);
@@ -270,7 +296,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
this.topic = checkNotNull(topic, "topic");
this.props = checkNotNull(props, "props");
- this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
+ this.deserializer = checkNotNull(deserializer, "valueDeserializer");
// validate the zookeeper properties
if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
@@ -300,7 +326,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
-
+
final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
@@ -372,8 +398,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
// no restore request. Let the offset handler take care of the initial offset seeking
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
}
-
-
}
@Override
@@ -394,7 +418,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
}
- fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+ fetcher.run(sourceContext, deserializer, lastOffsets);
if (offsetCommitter != null) {
offsetCommitter.close();
@@ -438,7 +462,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
LOG.warn("Error while closing Kafka connector data fetcher", e);
}
}
-
+
OffsetHandler offsetHandler = this.offsetHandler;
this.offsetHandler = null;
if (offsetHandler != null) {
@@ -449,8 +473,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
LOG.warn("Error while closing Kafka connector offset handler", e);
}
}
-
-
}
@Override
@@ -461,7 +483,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
@Override
public TypeInformation<T> getProducedType() {
- return valueDeserializer.getProducedType();
+ return deserializer.getProducedType();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index 77e41e5..ab4a88a 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import java.util.Properties;
@@ -48,4 +49,21 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
}
+
+ /**
+ * Creates a new Kafka 0.8.2.x streaming source consumer.
+ *
+ * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+ * pairs from Kafka.
+ *
+ * @param topic
+ * The name of the topic that should be consumed.
+ * @param deserializer
+ * The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+ * @param props
+ * The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+ */
+ public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+ super(topic, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 5e08464..6fe66d8 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -76,7 +78,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
* (Serializable) SerializationSchema for turning objects used with Flink into
* byte[] for Kafka.
*/
- private final SerializationSchema<IN, byte[]> schema;
+ private final KeyedSerializationSchema<IN> schema;
/**
* User-provided partitioner for assigning an object to a Kafka partition.
@@ -99,6 +101,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
/** Errors encountered in the async producer are stored here */
private transient volatile Exception asyncException;
+ // ------------------- Keyless serialization schema constructors ----------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic.
@@ -108,10 +111,10 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
- * User defined serialization schema.
+ * User defined (keyless) serialization schema.
*/
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
}
/**
@@ -121,23 +124,68 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
- * User defined serialization schema.
+ * User defined (keyless) serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, null);
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
}
/**
* The main constructor for creating a FlinkKafkaProducer.
*
* @param topicId The topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
*/
public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+ }
+
+ // ------------------- Key/Value serialization schema constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param brokerList
+ * Comma separated addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting key/value messages
+ */
+ public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema supporting key/value messages
+ * @param producerConfig
+ * Properties with the producer configuration.
+ */
+ public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+ this(topicId, serializationSchema, producerConfig, null);
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
Preconditions.checkNotNull(topicId, "TopicID not set");
Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
Preconditions.checkNotNull(producerConfig, "producerConfig not set");
@@ -244,11 +292,12 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
public void invoke(IN next) throws Exception {
// propagate asynchronous errors
checkErroneous();
-
- byte[] serialized = schema.serialize(next);
- ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
+
+ byte[] serializedKey = schema.serializeKey(next);
+ byte[] serializedValue = schema.serializeValue(next);
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicId,
partitioner.partition(next, partitions.length),
- null, serialized);
+ serializedKey, serializedValue);
producer.send(record, callback);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index 063d089..dabafa9 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -18,7 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
@@ -68,7 +68,8 @@ public interface Fetcher {
*
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
*/
- <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception;
+ <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer,
+ long[] lastOffsets) throws Exception;
/**
* Set the next offset to read from for the given partition.
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 212ba7d..95683ce 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -30,7 +30,7 @@ import kafka.message.MessageAndOffset;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -123,7 +123,7 @@ public class LegacyFetcher implements Fetcher {
@Override
public <T> void run(SourceFunction.SourceContext<T> sourceContext,
- DeserializationSchema<T> valueDeserializer,
+ KeyedDeserializationSchema<T> deserializer,
long[] lastOffsets) throws Exception {
if (partitionsToRead == null || partitionsToRead.size() == 0) {
@@ -195,7 +195,7 @@ public class LegacyFetcher implements Fetcher {
FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
- broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+ broker, partitions, sourceContext, deserializer, lastOffsets);
thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
taskName, broker.id(), broker.host(), broker.port()));
@@ -305,7 +305,7 @@ public class LegacyFetcher implements Fetcher {
private static class SimpleConsumerThread<T> extends Thread {
private final SourceFunction.SourceContext<T> sourceContext;
- private final DeserializationSchema<T> valueDeserializer;
+ private final KeyedDeserializationSchema<T> deserializer;
private final long[] offsetsState;
private final FetchPartition[] partitions;
@@ -327,7 +327,7 @@ public class LegacyFetcher implements Fetcher {
Node broker,
FetchPartition[] partitions,
SourceFunction.SourceContext<T> sourceContext,
- DeserializationSchema<T> valueDeserializer,
+ KeyedDeserializationSchema<T> deserializer,
long[] offsetsState) {
this.owner = owner;
this.config = config;
@@ -335,7 +335,7 @@ public class LegacyFetcher implements Fetcher {
this.broker = broker;
this.partitions = partitions;
this.sourceContext = checkNotNull(sourceContext);
- this.valueDeserializer = checkNotNull(valueDeserializer);
+ this.deserializer = checkNotNull(deserializer);
this.offsetsState = checkNotNull(offsetsState);
}
@@ -438,15 +438,26 @@ public class LegacyFetcher implements Fetcher {
+ " from partition " + fp.partition + " already");
continue;
}
-
+
+ // put value into byte array
ByteBuffer payload = msg.message().payload();
- byte[] valueByte = new byte[payload.remaining()];
- payload.get(valueByte);
-
- final T value = valueDeserializer.deserialize(valueByte);
+ byte[] valueBytes = new byte[payload.remaining()];
+ payload.get(valueBytes);
+
+ // put key into byte array
+ byte[] keyBytes = null;
+ int keySize = msg.message().keySize();
+
+ if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
+ ByteBuffer keyPayload = msg.message().key();
+ keyBytes = new byte[keySize];
+ keyPayload.get(keyBytes);
+ }
+
final long offset = msg.offset();
-
- synchronized (this.sourceContext.getCheckpointLock()) {
+ final T value = deserializer.deserialize(keyBytes, valueBytes, offset);
+
+ synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(value);
offsetsState[partition] = offset;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3e8154f..2c48bea 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
@@ -64,6 +65,9 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
@@ -82,6 +86,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -336,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (running && cnt < limit) {
- ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+ ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
cnt++;
}
}
@@ -817,13 +822,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
while (running) {
byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
- ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+ ctx.collect(new Tuple2<>(cnt++, wl));
Thread.sleep(100);
if (cnt == 10) {
// signal end
- ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+ ctx.collect(new Tuple2<>(-1L, new byte[]{1}));
break;
}
}
@@ -913,6 +918,85 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
+ public void runKeyValueTest() throws Exception {
+ final String topic = "keyvaluetest";
+ createTestTopic(topic, 1, 1);
+ final int ELEMENT_COUNT = 5000;
+
+
+
+ // ----------- Write some data into Kafka -------------------
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+ DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
+ @Override
+ public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
+ Random rnd = new Random(1337);
+ for(long i = 0; i < ELEMENT_COUNT; i++) {
+ PojoValue pojo = new PojoValue();
+ pojo.when = new Date(rnd.nextLong());
+ pojo.lon = rnd.nextLong();
+ pojo.lat = i;
+ // make every second key null to ensure proper "null" serialization
+ Long key = (i % 2 == 0) ? null : i;
+ ctx.collect(new Tuple2<>(key, pojo));
+ }
+ }
+ @Override
+ public void cancel() {
+ }
+ });
+
+ KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+ kvStream.addSink(new FlinkKafkaProducer<>(topic, schema,
+ FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings)));
+ env.execute("Write KV to Kafka");
+
+ // ----------- Read the data again -------------------
+
+ env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+ env.setParallelism(1);
+ env.setNumberOfExecutionRetries(3);
+ env.getConfig().disableSysoutLogging();
+
+
+ KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+
+ DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(new FlinkKafkaConsumer082<>(topic, readSchema, standardProps));
+ fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+ long counter = 0;
+ @Override
+ public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
+ // the elements should be in order.
+ Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+ if(value.f1.lat % 2 == 0) {
+ Assert.assertNull("key was not null", value.f0);
+ } else {
+ Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
+ }
+ counter++;
+ if(counter == ELEMENT_COUNT) {
+ // we got the right number of elements
+ throw new SuccessException();
+ }
+ }
+ });
+
+ tryExecute(env, "Read KV from Kafka");
+ }
+
+ public static class PojoValue {
+ public Date when;
+ public long lon;
+ public long lat;
+ public PojoValue() {}
+ }
+
+
// ------------------------------------------------------------------------
// Reading writing test data sets
// ------------------------------------------------------------------------
@@ -982,7 +1066,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
int partition = getRuntimeContext().getIndexOfThisSubtask();
while (running && cnt < numElements) {
- ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+ ctx.collect(new Tuple2<>(partition, cnt));
cnt++;
}
}
@@ -1030,7 +1114,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
- List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+ List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
int read = 0;
while(iteratorToRead.hasNext()) {
read++;
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index fd635a5..3ca7c5c 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -56,6 +56,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
runSimpleConcurrentProducerConsumerTopology();
}
+ @Test(timeout = 60000)
+ public void testKeyValueSupport() throws Exception {
+ runKeyValueTest();
+ }
+
// --- canceling / failures ---
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..6a20e44
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ *
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+ /**
+ * Deserializes the byte message.
+ *
+ * @param messageKey the key as a byte array (null if no key has been set)
+ * @param message The message, as a byte array.
+ * @param offset the offset of the message in the original source (for example the Kafka offset)
+ * @return The deserialized message as an object.
+ */
+ T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException;
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted.
+ *
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return True, if the element signals end of stream, false otherwise.
+ */
+ boolean isEndOfStream(T nextElement);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..fc7bd1e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
+
+ private static final long serialVersionUID = 2651665280744549932L;
+
+ private final DeserializationSchema<T> deserializationSchema;
+
+ public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ }
+ @Override
+ public T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
+ return deserializationSchema.deserialize(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return deserializationSchema.isEndOfStream(nextElement);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..be3e87e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ *
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+ /**
+ * Serializes the key of the incoming element to a byte array
+ * This method might return null if no key is available.
+ *
+ * @param element The incoming element to be serialized
+ * @return the key of the element as a byte array
+ */
+ byte[] serializeKey(T element);
+
+
+ /**
+ * Serializes the value of the incoming element to a byte array
+ *
+ * @param element The incoming element to be serialized
+ * @return the value of the element as a byte array
+ */
+ byte[] serializeValue(T element);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..26809aa
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
+
+ private static final long serialVersionUID = 1351665280744549933L;
+
+ private final SerializationSchema<T, byte[]> serializationSchema;
+
+ public KeyedSerializationSchemaWrapper(SerializationSchema<T, byte[]> serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ }
+
+ @Override
+ public byte[] serializeKey(T element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(T element) {
+ return serializationSchema.serialize(element);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..1c8efd5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ *
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+
+ private static final long serialVersionUID = -5359448468131559102L;
+
+ /** The serializer for the key */
+ private final TypeSerializer<K> keySerializer;
+
+ /** The serializer for the value */
+ private final TypeSerializer<V> valueSerializer;
+
+ /** reusable output serialization buffers */
+ private transient DataOutputSerializer keyOutputSerializer;
+ private transient DataOutputSerializer valueOutputSerializer;
+
+ /** The type information, to be returned by {@link #getProducedType()}. It is
+ * transient, because it is not serializable. Note that this means that the type information
+ * is not available at runtime, but only prior to the first serialization / deserialization */
+ private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new de-/serialization schema for the given types.
+ *
+ * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
+ * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
+ * @param ec The execution config, which is used to parametrize the type serializers.
+ */
+ public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+ this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+ this.keySerializer = keyTypeInfo.createSerializer(ec);
+ this.valueSerializer = valueTypeInfo.createSerializer(ec);
+ }
+
+ public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
+ //noinspection unchecked
+ this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config);
+ }
+
+ // ------------------------------------------------------------------------
+
+
+ @Override
+ public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
+ K key = null;
+ if(messageKey != null) {
+ key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
+ }
+ V value = valueSerializer.deserialize(new ByteArrayInputView(message));
+ return new Tuple2<>(key, value);
+ }
+
+ /**
+ * This schema never considers an element to signal end-of-stream, so this method returns always false.
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return Returns false.
+ */
+ @Override
+ public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+ return false;
+ }
+
+
+ @Override
+ public byte[] serializeKey(Tuple2<K, V> element) {
+ if(element.f0 == null) {
+ return null;
+ } else {
+ // key is not null. serialize it:
+ if (keyOutputSerializer == null) {
+ keyOutputSerializer = new DataOutputSerializer(16);
+ }
+ try {
+ keySerializer.serialize(element.f0, keyOutputSerializer);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+ // check if key byte array size changed
+ byte[] res = keyOutputSerializer.getByteArray();
+ if (res.length != keyOutputSerializer.length()) {
+ byte[] n = new byte[keyOutputSerializer.length()];
+ System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
+ res = n;
+ }
+ keyOutputSerializer.clear();
+ return res;
+ }
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple2<K, V> element) {
+ if (valueOutputSerializer == null) {
+ valueOutputSerializer = new DataOutputSerializer(16);
+ }
+
+ try {
+ valueSerializer.serialize(element.f1, valueOutputSerializer);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+
+ byte[] res = valueOutputSerializer.getByteArray();
+ if (res.length != valueOutputSerializer.length()) {
+ byte[] n = new byte[valueOutputSerializer.length()];
+ System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
+ res = n;
+ }
+ valueOutputSerializer.clear();
+ return res;
+ }
+
+
+ @Override
+ public TypeInformation<Tuple2<K,V>> getProducedType() {
+ if (typeInfo != null) {
+ return typeInfo;
+ }
+ else {
+ throw new IllegalStateException(
+ "The type information is not available after this class has been serialized and distributed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 6ff9712..e937838 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -29,6 +29,8 @@ import java.io.IOException;
/**
* A serialization and deserialization schema that uses Flink's serialization stack to
* transform typed from and to byte arrays.
+ *
+ * @see TypeInformationKeyValueSerializationSchema for a serialization schema supporting Key Value pairs.
*
* @param <T> The type to be serialized.
*/