You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:15 UTC

[5/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

This commit wraps up some general improvements to the new Kafka sink
custom partitioning API, most notably:
1. remove deprecated constructors from base classes, as they are not
user-facing.
2. modify producer IT test to test custom partitioning for dynamic
topics.
3. improve documentation and Javadocs of the new interfaces.

This closes #3901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fcbb08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fcbb08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fcbb08

Branch: refs/heads/master
Commit: e3fcbb087568a33aa03f58ba0dc359b1a6b02bfd
Parents: 9ed9b68
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 18 21:52:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:38:48 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |   4 +-
 .../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++-----
 .../connectors/kafka/FlinkKafkaProducer08.java  |  50 +++--
 .../connectors/kafka/Kafka08JsonTableSink.java  |  21 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |  25 +--
 .../connectors/kafka/FlinkKafkaProducer09.java  |  52 +++--
 .../connectors/kafka/Kafka09JsonTableSink.java  |  28 +--
 .../kafka/Kafka09JsonTableSinkTest.java         |  26 +--
 .../kafka/FlinkKafkaProducerBase.java           |  89 ++++----
 .../connectors/kafka/KafkaJsonTableSink.java    |  14 --
 .../connectors/kafka/KafkaTableSink.java        |  40 +---
 .../kafka/partitioner/FixedPartitioner.java     |  77 -------
 .../partitioner/FlinkFixedPartitioner.java      |  19 +-
 .../FlinkKafkaDelegatePartitioner.java          |   5 +-
 .../partitioner/FlinkKafkaPartitioner.java      |  24 +-
 .../kafka/partitioner/KafkaPartitioner.java     |  16 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |  38 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  98 ++++----
 .../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++-------
 .../kafka/KafkaTableSinkTestBase.java           |  51 +----
 .../connectors/kafka/TestFixedPartitioner.java  | 104 ---------
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ----------
 .../kafka/testutils/Tuple2Partitioner.java      |  49 ----
 23 files changed, 458 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 60a8039..bc7e7de 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -456,9 +456,9 @@ are other constructor variants that allow providing the following:
  Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
  details on how to configure Kafka Producers.
  * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
  constructor. This partitioner will be called for each record in the stream
- to determine which exact partition the record will be sent to.
+ to determine which exact partition of the target topic the record should be sent to.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to override the target topic,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7addafa..711fe07 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *  @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig,
-																					KafkaPartitioner<T> customPartitioner) {
-
-		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
-		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 *  @param inStream The stream to write to Kafka
-	 *  @param topicId The name of the target topic
-	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
 	 */
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
@@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
 	 */
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
 		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
-
+	
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
-	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
 		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
 	}
-	
+
+	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 *  @deprecated This is a deprecated since it does not correctly handle partitioning when
+	 *              producing to multiple topics. Use
+	 *              {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					KafkaPartitioner<T> customPartitioner) {
+
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer =
+				new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 *
+	 * @deprecated This is a deprecated since it does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	@Deprecated
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
-		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
 	}
 
-
 	// ----------------------------- Generic element processing  ---------------------------
 
 	private void invokeInternal(T next, long elementTimestamp) throws Exception {
@@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		ProducerRecord<byte[], byte[]> record;
 		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
 		if(null == partitions) {
-			partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+			partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
 			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
 		}
 		if (internalProducer.flinkKafkaPartitioner == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 64d3716..08dcb2f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
-	}
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
 	 */
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- Deprecated constructors ----------------------
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
@@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	@Deprecated
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
 	}
 
+	// ---------------------------------------------------------------------
+
 	@Override
 	protected void flush() {
 		// The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5a066ec0..80bd180 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,7 +29,7 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+	
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.8
 	 *
@@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
 	 */
-	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
-	
+
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.8
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+	@Deprecated
+	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 164c162..2136476 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -28,26 +27,20 @@ import java.util.Properties;
 public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+	protected KafkaTableSink createTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
 		return new Kafka08JsonTableSink(topic, properties, partitioner) {
 			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+					String topic,
+					Properties properties,
+					SerializationSchema<Row> serializationSchema,
+					FlinkKafkaPartitioner<Row> partitioner) {
 
-	@Override
-	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-		
-		return new Kafka08JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
 				return kafkaProducer;
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 4f41c43..cbed361 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
 	 */
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- Deprecated constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
@@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	@Deprecated
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
 	}
 
+	// ------------------------------------------------------------------
+
 	@Override
 	protected void flush() {
 		if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b82ebc4..a81422e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,43 +29,32 @@ import java.util.Properties;
  * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+	
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.9
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
-	 * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
-	
+
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.9
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
-	 */
-	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	/**
 	 *
-	 * @param topic               Kafka topic to produce to.
-	 * @param properties          Properties for the Kafka producer.
-	 * @param serializationSchema Serialization schema to use to create Kafka records.
-	 * @param partitioner         Partitioner to select Kafka partition.
-	 * @return The version-specific Kafka producer
-	 * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
 	 */
 	@Deprecated
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index ad8f623..3afb5e4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -27,28 +26,21 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
-	@Deprecated
 	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+	protected KafkaTableSink createTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
 		return new Kafka09JsonTableSink(topic, properties, partitioner) {
 			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+					String topic,
+					Properties properties,
+					SerializationSchema<Row> serializationSchema,
+					FlinkKafkaPartitioner<Row> partitioner) {
 
-	@Override
-	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka09JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
 				return kafkaProducer;
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f9a1e41..3a8228c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.Callback;
@@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	private static final long serialVersionUID = 1L;
 
 	/**
-	 * Configuration key for disabling the metrics reporting
+	 * Configuration key for disabling the metrics reporting.
 	 */
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	/**
-	 * User defined properties for the Producer
+	 * User defined properties for the Producer.
 	 */
 	protected final Properties producerConfig;
 
 	/**
-	 * The name of the default topic this producer is writing data to
+	 * The name of the default topic this producer is writing data to.
 	 */
 	protected final String defaultTopicId;
 
 	/**
-	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * (Serializable) SerializationSchema for turning objects used with Flink into.
 	 * byte[] for Kafka.
 	 */
 	protected final KeyedSerializationSchema<IN> schema;
 
 	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition for each topic
+	 * User-provided partitioner for assigning an object to a Kafka partition for each topic.
 	 */
-	protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+	protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
 
 	/**
-	 * Partitions for each topic
+	 * Partitions of each topic
 	 */
 	protected final Map<String, int[]> topicPartitionsMap;
 
 	/**
-	 * Flag indicating whether to accept failures (and log them), or to fail on failures
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures.
 	 */
 	protected boolean logFailuresOnly;
 
@@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 */
 	protected boolean flushOnCheckpoint;
 
-	/**
-	 * Retry times of fetching kafka meta
-	 */
-	protected long kafkaMetaRetryTimes;
-
 	// -------------------------------- Runtime fields ------------------------------------------
 
 	/** KafkaProducer instance */
@@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 	protected OperatorStateStore stateStore;
 
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
-	 *
-	 * @param defaultTopicId The default topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
-	 * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
-	}
-
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
@@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		producer = getKafkaProducer(this.producerConfig);
 
 		RuntimeContext ctx = getRuntimeContext();
+
 		if(null != flinkKafkaPartitioner) {
 			if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
-				((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
+				((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
+						getPartitionsByTopic(this.defaultTopicId, this.producer));
 			}
 			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
 		}
@@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 	}
 
-	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
-		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
-
-		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-				@Override
-				public int compare(PartitionInfo o1, PartitionInfo o2) {
-				return Integer.compare(o1.partition(), o2.partition());
-			}
-			});
-
-		int[] partitions = new int[partitionsList.size()];
-		for (int i = 0; i < partitions.length; i++) {
-			partitions[i] = partitionsList.get(i).partition();
-		}
-
-		return partitions;
-	}
-
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 *
@@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 		int[] partitions = this.topicPartitionsMap.get(targetTopic);
 		if(null == partitions) {
-			partitions = this.getPartitionsByTopic(targetTopic, producer);
+			partitions = getPartitionsByTopic(targetTopic, producer);
 			this.topicPartitionsMap.put(targetTopic, partitions);
 		}
 
@@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		if (flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
+			record = new ProducerRecord<>(
+					targetTopic,
+					flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
+					serializedKey,
+					serializedValue);
 		}
 		if (flushOnCheckpoint) {
 			synchronized (pendingRecordsLock) {
@@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		return props;
 	}
 
+	protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+			@Override
+			public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+		});
+
+		int[] partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		return partitions;
+	}
+
 	@VisibleForTesting
 	protected long numPendingRecords() {
 		synchronized (pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index a0b5033..41bb329 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
@@ -29,19 +28,6 @@ import java.util.Properties;
  * Base class for {@link KafkaTableSink} that serializes data in JSON format
  */
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
-	/**
-	 * Creates KafkaJsonTableSink
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
 	
 	/**
 	 * Creates KafkaJsonTableSink

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 0a937d6..1c38816 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,13 +18,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
  */
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
@@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
 	/**
 	 * Creates KafkaTableSink
-	 *
-	 * @param topic                 Kafka topic to write to.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param partitioner           Partitioner to select Kafka partition for each item
-	 * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public KafkaTableSink(
-			String topic,
-			Properties properties,
-			KafkaPartitioner<Row> partitioner) {
-		this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
-	}
-
-	/**
-	 * Creates KafkaTableSink
 	 * 
 	 * @param topic                 Kafka topic to write to.
 	 * @param properties            Properties for the Kafka consumer.
@@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param serializationSchema Serialization schema to use to create Kafka records.
 	 * @param partitioner         Partitioner to select Kafka partition.
 	 * @return The version-specific Kafka producer
-	 * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-		String topic, Properties properties,
-		SerializationSchema<Row> serializationSchema,
-		KafkaPartitioner<Row> partitioner);
-
-	/**
-	 * Returns the version-specifid Kafka producer.
-	 *
-	 * @param topic               Kafka topic to produce to.
-	 * @param properties          Properties for the Kafka producer.
-	 * @param serializationSchema Serialization schema to use to create Kafka records.
-	 * @param partitioner         Partitioner to select Kafka partition.
-	 * @return The version-specific Kafka producer
 	 */
 	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
 		String topic, Properties properties,
@@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
 		return copy;
 	}
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index edabfe0..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * 	# More Flink partitions than kafka partitions
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2	----------------&gt;	2
- * 										3
- * 										4
- * 										5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *  @deprecated Use {@link FlinkFixedPartitioner} instead.
- *
- */
-@Deprecated
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
-	private static final long serialVersionUID = 1627268846962918126L;
-
-	private int targetPartition = -1;
-
-	@Override
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.targetPartition = partitions[parallelInstanceId % partitions.length];
-	}
-
-	@Override
-	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-		if (targetPartition >= 0) {
-			return targetPartition;
-		} else {
-			throw new RuntimeException("The partitioner has not been initialized properly");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index d2eb7af..e47c667 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
  *
@@ -44,9 +46,8 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
  * </pre>
  *
  *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers).
  */
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
@@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
 	@Override
 	public void open(int parallelInstanceId, int parallelInstances) {
-		if (parallelInstanceId < 0 || parallelInstances <= 0) {
-			throw new IllegalArgumentException();
-		}
+		Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
+		Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
+
 		this.parallelInstanceId = parallelInstanceId;
 	}
 	
 	@Override
 	public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-		if(null == partitions || partitions.length == 0) {
-			throw new IllegalArgumentException();
-		}
+		Preconditions.checkArgument(
+			partitions != null && partitions.length > 0,
+			"Partitions of the target topic is empty.");
 		
 		return partitions[parallelInstanceId % partitions.length];
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 469fd1b..b7b4143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,8 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 /**
- * Delegate for KafkaPartitioner
- * @param <T>
+ * Delegate for the deprecated {@link KafkaPartitioner}.
+ * This should only be used for bridging deprecated partitioning API methods.
+ *
  * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
  */
 @Deprecated

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index e074b9b..b634af7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -20,20 +20,34 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
 import java.io.Serializable;
 
 /**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
+ * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
+ * across partitions of multiple Kafka topics.
  */
 public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+
 	private static final long serialVersionUID = -9086719227828020494L;
 
 	/**
-	 * Initializer for the Partitioner.
-	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * Initializer for the partitioner. This is called once on each parallel sink instance of
+	 * the Flink Kafka producer. This method should be overridden if necessary.
+	 *
+	 * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 */
 	public void open(int parallelInstanceId, int parallelInstances) {
 		// overwrite this method if needed.
 	}
-	
+
+	/**
+	 * Determine the id of the partition that the record should be written to.
+	 *
+	 * @param record the record value
+	 * @param key serialized key of the record
+	 * @param value serialized value of the record
+	 * @param targetTopic target topic for the record
+	 * @param partitions found partitions for the target topic
+	 *
+	 * @return the id of the target partition
+	 */
 	public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 7c82bd1..eebc619 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
- * @deprecated Use {@link FlinkKafkaPartitioner} instead.
+ *
+ * @deprecated This partitioner does not handle partitioning properly in the case of
+ *             multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead.
  */
 @Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
@@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements Serializable {
 	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
 	 */
-	@Deprecated
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
 		// overwrite this method if needed.
 	}
 
-	/**
-	 *
-	 * @param next
-	 * @param serializedKey
-	 * @param serializedValue
-	 * @param numPartitions
-	 * @return
-	 * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
-	 */
-	@Deprecated
 	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 1f16d8e..6b2cc02 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest {
 		// no bootstrap servers set in props
 		Properties props = new Properties();
 		// should throw IllegalArgumentException
-		new DummyFlinkKafkaProducer<>(props, null);
+		new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 	}
 
 	/**
@@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest {
 		Properties props = new Properties();
 		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
 		// should set missing key value deserializers
-		new DummyFlinkKafkaProducer<>(props, null);
+		new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
 		assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
@@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest {
 	/**
 	 * Tests that partitions list is determinate and correctly provided to custom partitioner
 	 */
+	@SuppressWarnings("unchecked")
 	@Test
-	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
-		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+	public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
+		FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
 
 		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
 		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
@@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest {
 		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
 		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
 
-		final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
-			FakeStandardProducerConfig.get(), mockPartitioner);
+		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
 		producer.setRuntimeContext(mockRuntimeContext);
 
 		final KafkaProducer mockProducer = producer.getMockKafkaProducer();
@@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest {
 		when(mockProducer.metrics()).thenReturn(null);
 
 		producer.open(new Configuration());
+		verify(mockPartitioner, times(1)).open(0, 1);
 
-		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
-		int[] correctPartitionList = {0, 1, 2, 3};
-		verify(mockPartitioner).open(0, 1, correctPartitionList);
+		producer.invoke("foobar");
+		verify(mockPartitioner, times(1)).partition(
+			"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
 	}
 
 	/**
@@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test
 	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness =
 			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test
 	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness =
 			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=5000)
 	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(true);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=10000)
 	public void testAtLeastOnceProducer() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(true);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=5000)
 	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(false);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest {
 		private boolean isFlushed;
 
 		@SuppressWarnings("unchecked")
-		DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+		DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
 
-			super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+			super(DUMMY_TOPIC, schema, producerConfig, partitioner);
 
 			this.mockProducer = mock(KafkaProducer.class);
 			when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 203d814..ac278fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-		
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
-
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state.get(0);
 		}
 	}
+
+	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+		KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+		public Tuple2WithTopicSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+
+		@Override
+		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+			ByteArrayOutputStream by = new ByteArrayOutputStream();
+			DataOutputView out = new DataOutputViewStreamWrapper(by);
+			try {
+				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+			} catch (IOException e) {
+				throw new RuntimeException("Error" ,e);
+			}
+			return by.toByteArray();
+		}
+
+		@Override
+		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+			return element.f2;
+		}
+	}
 }