You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/24 13:23:37 UTC

flink git commit: [FLINK-3045] Properly expose the key of a Kafka message

Repository: flink
Updated Branches:
  refs/heads/master 89b1d2332 -> 85e7b2878


[FLINK-3045] Properly expose the key of a Kafka message

This closes #1385


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85e7b287
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85e7b287
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85e7b287

Branch: refs/heads/master
Commit: 85e7b2878eb50767ba64bbbd5794929d151f81ef
Parents: 89b1d23
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 19 15:41:01 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Nov 24 13:23:07 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    |  48 ++++--
 .../connectors/kafka/FlinkKafkaConsumer082.java |  18 ++
 .../connectors/kafka/FlinkKafkaProducer.java    |  69 ++++++--
 .../connectors/kafka/internals/Fetcher.java     |   5 +-
 .../kafka/internals/LegacyFetcher.java          |  37 +++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  94 ++++++++++-
 .../streaming/connectors/kafka/KafkaITCase.java |   5 +
 .../KeyedDeserializationSchema.java             |  52 ++++++
 .../KeyedDeserializationSchemaWrapper.java      |  51 ++++++
 .../serialization/KeyedSerializationSchema.java |  48 ++++++
 .../KeyedSerializationSchemaWrapper.java        |  43 +++++
 ...eInformationKeyValueSerializationSchema.java | 163 +++++++++++++++++++
 .../TypeInformationSerializationSchema.java     |   2 +
 13 files changed, 592 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e701639..446648f 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -38,6 +38,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.Node;
@@ -207,7 +209,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	private final int[] partitions;
 	
 	/** The schema to convert between Kafka#s byte messages, and Flink's objects */
-	private final DeserializationSchema<T> valueDeserializer;
+	private final KeyedDeserializationSchema<T> deserializer;
 
 	// ------  Runtime State  -------
 
@@ -234,9 +236,33 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	private transient long[] restoreToOffset;
 	
 	private volatile boolean running = true;
-
+	
 	// ------------------------------------------------------------------------
 
+
+	/**
+	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
+	 *
+	 * <p>To determine which kink of fetcher and offset handler to use, please refer to the docs
+	 * at the beginnign of this class.</p>
+	 *
+	 * @param topic
+	 *           The Kafka topic to read from.
+	 * @param deserializer
+	 *           The deserializer to turn raw byte messages (without key) into Java/Scala objects.
+	 * @param props
+	 *           The properties that are used to configure both the fetcher and the offset handler.
+	 * @param offsetStore
+	 *           The type of offset store to use (Kafka / ZooKeeper)
+	 * @param fetcherType
+	 *           The type of fetcher to use (new high-level API, old low-level API).
+	 */
+	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> deserializer, Properties props,
+							OffsetStore offsetStore, FetcherType fetcherType) {
+		this(topic, new KeyedDeserializationSchemaWrapper<>(deserializer),
+				props, offsetStore, fetcherType);
+	}
+
 	/**
 	 * Creates a new Flink Kafka Consumer, using the given type of fetcher and offset handler.
 	 * 
@@ -245,7 +271,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * 
 	 * @param topic 
 	 *           The Kafka topic to read from.
-	 * @param valueDeserializer
+	 * @param deserializer
 	 *           The deserializer to turn raw byte messages into Java/Scala objects.
 	 * @param props
 	 *           The properties that are used to configure both the fetcher and the offset handler.
@@ -254,7 +280,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	 * @param fetcherType
 	 *           The type of fetcher to use (new high-level API, old low-level API).
 	 */
-	public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props, 
+	public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props,
 								OffsetStore offsetStore, FetcherType fetcherType) {
 		this.offsetStore = checkNotNull(offsetStore);
 		this.fetcherType = checkNotNull(fetcherType);
@@ -270,7 +296,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 		
 		this.topic = checkNotNull(topic, "topic");
 		this.props = checkNotNull(props, "props");
-		this.valueDeserializer = checkNotNull(valueDeserializer, "valueDeserializer");
+		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 
 		// validate the zookeeper properties
 		if (offsetStore == OffsetStore.FLINK_ZOOKEEPER) {
@@ -300,7 +326,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 	@Override
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
-
+		
 		final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
 		final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();
 		
@@ -372,8 +398,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 			// no restore request. Let the offset handler take care of the initial offset seeking
 			offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
 		}
-
-
 	}
 
 	@Override
@@ -394,7 +418,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
 			}
 
-			fetcher.run(sourceContext, valueDeserializer, lastOffsets);
+			fetcher.run(sourceContext, deserializer, lastOffsets);
 
 			if (offsetCommitter != null) {
 				offsetCommitter.close();
@@ -438,7 +462,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				LOG.warn("Error while closing Kafka connector data fetcher", e);
 			}
 		}
-
+		
 		OffsetHandler offsetHandler = this.offsetHandler;
 		this.offsetHandler = null;
 		if (offsetHandler != null) {
@@ -449,8 +473,6 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 				LOG.warn("Error while closing Kafka connector offset handler", e);
 			}
 		}
-
-
 	}
 
 	@Override
@@ -461,7 +483,7 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T>
 
 	@Override
 	public TypeInformation<T> getProducedType() {
-		return valueDeserializer.getProducedType();
+		return deserializer.getProducedType();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
index 77e41e5..ab4a88a 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 
 import java.util.Properties;
 
@@ -48,4 +49,21 @@ public class FlinkKafkaConsumer082<T> extends FlinkKafkaConsumer<T> {
 	public FlinkKafkaConsumer082(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
 		super(topic, valueDeserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
 	}
+
+	/**
+	 * Creates a new Kafka 0.8.2.x streaming source consumer.
+	 *
+	 * This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
+	 * pairs from Kafka.
+	 *
+	 * @param topic
+	 *           The name of the topic that should be consumed.
+	 * @param deserializer
+	 *           The de-/serializer used to convert between Kafka's byte messages and Flink's objects.
+	 * @param props
+	 *           The properties used to configure the Kafka consumer client, and the ZooKeeper client.
+	 */
+	public FlinkKafkaConsumer082(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
+		super(topic, deserializer, props, OffsetStore.FLINK_ZOOKEEPER, FetcherType.LEGACY_LOW_LEVEL);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 5e08464..6fe66d8 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
 
@@ -76,7 +78,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	 * (Serializable) SerializationSchema for turning objects used with Flink into
 	 * byte[] for Kafka.
 	 */
-	private final SerializationSchema<IN, byte[]> schema;
+	private final KeyedSerializationSchema<IN> schema;
 
 	/**
 	 * User-provided partitioner for assigning an object to a Kafka partition.
@@ -99,6 +101,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	/** Errors encountered in the async producer are stored here */
 	private transient volatile Exception asyncException;
 
+	// ------------------- Keyless serialization schema constructors ----------------------
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
 	 * the topic.
@@ -108,10 +111,10 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined serialization schema.
+	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN, byte[]> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
 	}
 
 	/**
@@ -121,23 +124,68 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	 * @param topicId
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
-	 * 			User defined serialization schema.
+	 * 			User defined (keyless) serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, null);
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
 	}
 
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
 	 */
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN, byte[]> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
+	// ------------------- Key/Value serialization schema constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param brokerList
+	 *			Comma separated addresses of the brokers
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 */
+	public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param topicId
+	 * 			ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 			User defined serialization schema supporting key/value messages
+	 * @param producerConfig
+	 * 			Properties with the producer configuration.
+	 */
+	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, null);
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) {
 		Preconditions.checkNotNull(topicId, "TopicID not set");
 		Preconditions.checkNotNull(serializationSchema, "serializationSchema not set");
 		Preconditions.checkNotNull(producerConfig, "producerConfig not set");
@@ -244,11 +292,12 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	public void invoke(IN next) throws Exception {
 		// propagate asynchronous errors
 		checkErroneous();
-		
-		byte[] serialized = schema.serialize(next);
-		ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
+
+		byte[] serializedKey = schema.serializeKey(next);
+		byte[] serializedValue = schema.serializeValue(next);
+		ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicId,
 				partitioner.partition(next, partitions.length),
-				null, serialized);
+				serializedKey, serializedValue);
 		
 		producer.send(record, callback);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
index 063d089..dabafa9 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Fetcher.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka.internals;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.kafka.common.TopicPartition;
 
 import java.io.IOException;
@@ -68,7 +68,8 @@ public interface Fetcher {
 	 * 
 	 * @param <T> The type of elements produced by the fetcher and emitted to the source context.
 	 */
-	<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception;
+	<T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> valueDeserializer,
+					long[] lastOffsets) throws Exception;
 	
 	/**
 	 * Set the next offset to read from for the given partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index 212ba7d..95683ce 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -30,7 +30,7 @@ import kafka.message.MessageAndOffset;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -123,7 +123,7 @@ public class LegacyFetcher implements Fetcher {
 
 	@Override
 	public <T> void run(SourceFunction.SourceContext<T> sourceContext,
-						DeserializationSchema<T> valueDeserializer,
+						KeyedDeserializationSchema<T> deserializer,
 						long[] lastOffsets) throws Exception {
 		
 		if (partitionsToRead == null || partitionsToRead.size() == 0) {
@@ -195,7 +195,7 @@ public class LegacyFetcher implements Fetcher {
 			FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
 
 			SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, topic,
-					broker, partitions, sourceContext, valueDeserializer, lastOffsets);
+					broker, partitions, sourceContext, deserializer, lastOffsets);
 
 			thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
 					taskName, broker.id(), broker.host(), broker.port()));
@@ -305,7 +305,7 @@ public class LegacyFetcher implements Fetcher {
 	private static class SimpleConsumerThread<T> extends Thread {
 		
 		private final SourceFunction.SourceContext<T> sourceContext;
-		private final DeserializationSchema<T> valueDeserializer;
+		private final KeyedDeserializationSchema<T> deserializer;
 		private final long[] offsetsState;
 		
 		private final FetchPartition[] partitions;
@@ -327,7 +327,7 @@ public class LegacyFetcher implements Fetcher {
 									Node broker,
 									FetchPartition[] partitions,
 									SourceFunction.SourceContext<T> sourceContext,
-									DeserializationSchema<T> valueDeserializer,
+									KeyedDeserializationSchema<T> deserializer,
 									long[] offsetsState) {
 			this.owner = owner;
 			this.config = config;
@@ -335,7 +335,7 @@ public class LegacyFetcher implements Fetcher {
 			this.broker = broker;
 			this.partitions = partitions;
 			this.sourceContext = checkNotNull(sourceContext);
-			this.valueDeserializer = checkNotNull(valueDeserializer);
+			this.deserializer = checkNotNull(deserializer);
 			this.offsetsState = checkNotNull(offsetsState);
 		}
 
@@ -438,15 +438,26 @@ public class LegacyFetcher implements Fetcher {
 											+ " from partition " + fp.partition + " already");
 									continue;
 								}
-								
+
+								// put value into byte array
 								ByteBuffer payload = msg.message().payload();
-								byte[] valueByte = new byte[payload.remaining()];
-								payload.get(valueByte);
-								
-								final T value = valueDeserializer.deserialize(valueByte);
+								byte[] valueBytes = new byte[payload.remaining()];
+								payload.get(valueBytes);
+
+								// put key into byte array
+								byte[] keyBytes = null;
+								int keySize = msg.message().keySize();
+
+								if (keySize >= 0) { // message().hasKey() is doing the same. We save one int deserialization
+									ByteBuffer keyPayload = msg.message().key();
+									keyBytes = new byte[keySize];
+									keyPayload.get(keyBytes);
+								}
+
 								final long offset = msg.offset();
-										
-								synchronized (this.sourceContext.getCheckpointLock()) {
+								final T value = deserializer.deserialize(keyBytes, valueBytes, offset);
+
+								synchronized (sourceContext.getCheckpointLock()) {
 									sourceContext.collect(value);
 									offsetsState[partition] = offset;
 								}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 3e8154f..2c48bea 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
@@ -64,6 +65,9 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
@@ -82,6 +86,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -336,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 
 				while (running && cnt < limit) {
-					ctx.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt));
+					ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
 					cnt++;
 				}
 			}
@@ -817,13 +822,13 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 				while (running) {
 					byte[] wl = new byte[fifteenMb + rnd.nextInt(fifteenMb)];
-					ctx.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+					ctx.collect(new Tuple2<>(cnt++, wl));
 
 					Thread.sleep(100);
 
 					if (cnt == 10) {
 						// signal end
-						ctx.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+						ctx.collect(new Tuple2<>(-1L, new byte[]{1}));
 						break;
 					}
 				}
@@ -913,6 +918,85 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	}
 
+	public void runKeyValueTest() throws Exception {
+		final String topic = "keyvaluetest";
+		createTestTopic(topic, 1, 1);
+		final int ELEMENT_COUNT = 5000;
+
+
+
+		// ----------- Write some data into Kafka -------------------
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+		DataStream<Tuple2<Long, PojoValue>> kvStream = env.addSource(new SourceFunction<Tuple2<Long, PojoValue>>() {
+			@Override
+			public void run(SourceContext<Tuple2<Long, PojoValue>> ctx) throws Exception {
+				Random rnd = new Random(1337);
+				for(long i = 0; i < ELEMENT_COUNT; i++) {
+					PojoValue pojo = new PojoValue();
+					pojo.when = new Date(rnd.nextLong());
+					pojo.lon = rnd.nextLong();
+					pojo.lat = i;
+					// make every second key null to ensure proper "null" serialization
+					Long key = (i % 2 == 0) ? null : i;
+					ctx.collect(new Tuple2<>(key, pojo));
+				}
+			}
+			@Override
+			public void cancel() {
+			}
+		});
+
+		KeyedSerializationSchema<Tuple2<Long, PojoValue>> schema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+		kvStream.addSink(new FlinkKafkaProducer<>(topic, schema,
+				FlinkKafkaProducer.getPropertiesFromBrokerList(brokerConnectionStrings)));
+		env.execute("Write KV to Kafka");
+
+		// ----------- Read the data again -------------------
+
+		env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+		env.setParallelism(1);
+		env.setNumberOfExecutionRetries(3);
+		env.getConfig().disableSysoutLogging();
+
+
+		KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
+
+		DataStream<Tuple2<Long, PojoValue>> fromKafka = env.addSource(new FlinkKafkaConsumer082<>(topic, readSchema, standardProps));
+		fromKafka.flatMap(new RichFlatMapFunction<Tuple2<Long,PojoValue>, Object>() {
+			long counter = 0;
+			@Override
+			public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws Exception {
+				// the elements should be in order.
+				Assert.assertTrue("Wrong value " + value.f1.lat, value.f1.lat == counter );
+				if(value.f1.lat % 2 == 0) {
+					Assert.assertNull("key was not null", value.f0);
+				} else {
+					Assert.assertTrue("Wrong value " + value.f0, value.f0 == counter);
+				}
+				counter++;
+				if(counter == ELEMENT_COUNT) {
+					// we got the right number of elements
+					throw new SuccessException();
+				}
+			}
+		});
+
+		tryExecute(env, "Read KV from Kafka");
+	}
+
+	public static class PojoValue {
+		public Date when;
+		public long lon;
+		public long lat;
+		public PojoValue() {}
+	}
+
+
 	// ------------------------------------------------------------------------
 	//  Reading writing test data sets
 	// ------------------------------------------------------------------------
@@ -982,7 +1066,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 				int partition = getRuntimeContext().getIndexOfThisSubtask();
 
 				while (running && cnt < numElements) {
-					ctx.collect(new Tuple2<Integer, Integer>(partition, cnt));
+					ctx.collect(new Tuple2<>(partition, cnt));
 					cnt++;
 				}
 			}
@@ -1030,7 +1114,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId());
 		ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator();
 
-		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<MessageAndMetadata<byte[], byte[]>>();
+		List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>();
 		int read = 0;
 		while(iteratorToRead.hasNext()) {
 			read++;

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index fd635a5..3ca7c5c 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -56,6 +56,11 @@ public class KafkaITCase extends KafkaConsumerTestBase {
 		runSimpleConcurrentProducerConsumerTopology();
 	}
 
+	@Test(timeout = 60000)
+	public void testKeyValueSupport() throws Exception {
+		runKeyValueTest();
+	}
+
 	// --- canceling / failures ---
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..6a20e44
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ * 
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Deserializes the byte message.
+	 *
+	 * @param messageKey the key as a byte array (null if no key has been set)
+	 * @param message The message, as a byte array.
+	 * @param offset the offset of the message in the original source (for example the Kafka offset)
+	 * @return The deserialized message as an object.
+	 */
+	T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 * 
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..fc7bd1e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
+
+	private static final long serialVersionUID = 2651665280744549932L;
+
+	private final DeserializationSchema<T> deserializationSchema;
+
+	public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+	@Override
+	public T deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
+		return deserializationSchema.deserialize(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return deserializationSchema.isEndOfStream(nextElement);
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..be3e87e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ * 
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+	/**
+	 * Serializes the key of the incoming element to a byte array
+	 * This method might return null if no key is available.
+	 *
+	 * @param element The incoming element to be serialized
+	 * @return the key of the element as a byte array
+	 */
+	byte[] serializeKey(T element);
+
+
+	/**
+	 * Serializes the value of the incoming element to a byte array
+	 * 
+	 * @param element The incoming element to be serialized
+	 * @return the value of the element as a byte array
+	 */
+	byte[] serializeValue(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..26809aa
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
+
+	private static final long serialVersionUID = 1351665280744549933L;
+
+	private final SerializationSchema<T, byte[]> serializationSchema;
+
+	public KeyedSerializationSchemaWrapper(SerializationSchema<T, byte[]> serializationSchema) {
+		this.serializationSchema = serializationSchema;
+	}
+
+	@Override
+	public byte[] serializeKey(T element) {
+		return null;
+	}
+
+	@Override
+	public byte[] serializeValue(T element) {
+		return serializationSchema.serialize(element);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..1c8efd5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ * 
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+
+	private static final long serialVersionUID = -5359448468131559102L;
+
+	/** The serializer for the key */
+	private final TypeSerializer<K> keySerializer;
+
+	/** The serializer for the value */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** reusable output serialization buffers */
+	private transient DataOutputSerializer keyOutputSerializer;
+	private transient DataOutputSerializer valueOutputSerializer;
+
+	/** The type information, to be returned by {@link #getProducedType()}. It is
+	 * transient, because it is not serializable. Note that this means that the type information
+	 * is not available at runtime, but only prior to the first serialization / deserialization */
+	private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new de-/serialization schema for the given types.
+	 *
+	 * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
+	 * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
+	 * @param ec The execution config, which is used to parametrize the type serializers.
+	 */
+	public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+		this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+		this.keySerializer = keyTypeInfo.createSerializer(ec);
+		this.valueSerializer = valueTypeInfo.createSerializer(ec);
+	}
+
+	public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
+		//noinspection unchecked
+		this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config);
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, long offset) throws IOException {
+		K key = null;
+		if(messageKey != null) {
+			key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
+		}
+		V value = valueSerializer.deserialize(new ByteArrayInputView(message));
+		return new Tuple2<>(key, value);
+	}
+
+	/**
+	 * This schema never considers an element to signal end-of-stream, so this method returns always false.
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return Returns false.
+	 */
+	@Override
+	public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+		return false;
+	}
+
+
+	@Override
+	public byte[] serializeKey(Tuple2<K, V> element) {
+		if(element.f0 == null) {
+			return null;
+		} else {
+			// key is not null. serialize it:
+			if (keyOutputSerializer == null) {
+				keyOutputSerializer = new DataOutputSerializer(16);
+			}
+			try {
+				keySerializer.serialize(element.f0, keyOutputSerializer);
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Unable to serialize record", e);
+			}
+			// check if key byte array size changed
+			byte[] res = keyOutputSerializer.getByteArray();
+			if (res.length != keyOutputSerializer.length()) {
+				byte[] n = new byte[keyOutputSerializer.length()];
+				System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
+				res = n;
+			}
+			keyOutputSerializer.clear();
+			return res;
+		}
+	}
+
+	@Override
+	public byte[] serializeValue(Tuple2<K, V> element) {
+		if (valueOutputSerializer == null) {
+			valueOutputSerializer = new DataOutputSerializer(16);
+		}
+
+		try {
+			valueSerializer.serialize(element.f1, valueOutputSerializer);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Unable to serialize record", e);
+		}
+
+		byte[] res = valueOutputSerializer.getByteArray();
+		if (res.length != valueOutputSerializer.length()) {
+			byte[] n = new byte[valueOutputSerializer.length()];
+			System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
+			res = n;
+		}
+		valueOutputSerializer.clear();
+		return res;
+	}
+
+
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		if (typeInfo != null) {
+			return typeInfo;
+		}
+		else {
+			throw new IllegalStateException(
+					"The type information is not available after this class has been serialized and distributed.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85e7b287/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 6ff9712..e937838 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -29,6 +29,8 @@ import java.io.IOException;
 /**
  * A serialization and deserialization schema that uses Flink's serialization stack to
  * transform typed from and to byte arrays.
+ *
+ * @see TypeInformationKeyValueSerializationSchema for a serialization schema supporting Key Value pairs.
  * 
  * @param <T> The type to be serialized.
  */