You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:42:41 UTC

[1/6] flink git commit: [FLINK-6608] [security, config] Relax Kerberos login contexts parsing

Repository: flink
Updated Branches:
  refs/heads/release-1.3 94111f985 -> 0963718ac


[FLINK-6608] [security, config] Relax Kerberos login contexts parsing

This closes #3928.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a43badc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a43badc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a43badc

Branch: refs/heads/release-1.3
Commit: 1a43badcd2e7893819f6aba9e7cd7688176efc58
Parents: 94111f9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 17 16:00:37 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:41:48 2017 +0800

----------------------------------------------------------------------
 .../flink/runtime/security/SecurityUtils.java   |  8 ++-
 .../runtime/security/SecurityUtilsTest.java     | 62 +++++++++++++++++++-
 2 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a43badc/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 7a09c32..b874009 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -230,10 +230,14 @@ public class SecurityUtils {
 		}
 
 		private static List<String> parseList(String value) {
-			if(value == null) {
+			if(value == null || value.isEmpty()) {
 				return Collections.emptyList();
 			}
-			return Arrays.asList(value.split(","));
+
+			return Arrays.asList(value
+				.trim()
+				.replaceAll("(\\s*,+\\s*)+", ",")
+				.split(","));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1a43badc/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index c5624f4..3e3808b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,13 +18,19 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.modules.SecurityModule;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link SecurityUtils}.
@@ -78,4 +84,58 @@ public class SecurityUtilsTest {
 		SecurityUtils.uninstall();
 		assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
 	}
+
+	@Test
+	public void testKerberosLoginContextParsing() {
+
+		List<String> expectedLoginContexts = Arrays.asList("Foo bar", "Client");
+
+		Configuration testFlinkConf;
+		SecurityUtils.SecurityConfiguration testSecurityConf;
+
+		// ------- no whitespaces
+
+		testFlinkConf = new Configuration();
+		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,Client");
+		testSecurityConf = new SecurityUtils.SecurityConfiguration(
+			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
+		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+		// ------- with whitespaces surrounding comma
+
+		testFlinkConf = new Configuration();
+		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar , Client");
+		testSecurityConf = new SecurityUtils.SecurityConfiguration(
+			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
+		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+		// ------- leading / trailing whitespaces at start and end of list
+
+		testFlinkConf = new Configuration();
+		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, " Foo bar , Client ");
+		testSecurityConf = new SecurityUtils.SecurityConfiguration(
+			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
+		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+		// ------- empty entries
+
+		testFlinkConf = new Configuration();
+		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,,Client");
+		testSecurityConf = new SecurityUtils.SecurityConfiguration(
+			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
+		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+		// ------- empty trailing String entries with whitespaces
+
+		testFlinkConf = new Configuration();
+		testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar, ,, Client,");
+		testSecurityConf = new SecurityUtils.SecurityConfiguration(
+			testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+			Collections.singletonList(TestSecurityModule.class));
+		assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+	}
 }


[2/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics

Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2adb5ec..203d814 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,7 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	/**
 	 * Test producing and consuming into multiple topics
-	 * @throws java.lang.Exception
+	 * @throws Exception
 	 */
-	public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
+	public void runProduceConsumeMultipleTopics() throws Exception {
 		final int NUM_TOPICS = 5;
 		final int NUM_ELEMENTS = 20;
 
@@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
+	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+		
+		public Tuple2WithTopicSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+
+		@Override
+		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+			ByteArrayOutputStream by = new ByteArrayOutputStream();
+			DataOutputView out = new DataOutputViewStreamWrapper(by);
+			try {
+				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+			} catch (IOException e) {
+				throw new RuntimeException("Error" ,e);
+			}
+			return by.toByteArray();
+		}
+
+		@Override
+		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+			return element.f2;
+		}
+	}
+
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			producerProperties.setProperty("retries", "0");
 			producerProperties.putAll(secureProps);
 			
-			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
 					.setParallelism(parallelism);
 
 			try {
@@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state.get(0);
 		}
 	}
-
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-		KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 6f61392..8285048 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 
-
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
 		private final int expectedPartitions;
 
@@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 
 		@Override
-		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			assertEquals(expectedPartitions, numPartitions);
+		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+			assertEquals(expectedPartitions, partitions.length);
 
-			return (int) (next.f0 % numPartitions);
+			return (int) (next.f0 % expectedPartitions);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e0e8f84..0fdc82e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,25 +17,26 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.io.Serializable;
+import java.util.Properties;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.util.Properties;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase {
 	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
 	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+	private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
 	@SuppressWarnings("unchecked")
 	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
+	public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
+		DataStream dataStream = mock(DataStream.class);
+
+		KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
+		kafkaTableSink.emitDataStream(dataStream);
+
+		verify(dataStream).addSink(eq(PRODUCER));
+
+		verify(kafkaTableSink).createKafkaProducer(
+			eq(TOPIC),
+			eq(PROPERTIES),
+			any(getSerializationSchema().getClass()),
+			eq(FLINK_PARTITIONER));
+	}
+
+	@Test
 	public void testConfiguration() {
 		KafkaTableSink kafkaTableSink = createTableSink();
 		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase {
 		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
+	@Test
+	public void testConfigurationWithFlinkPartitioner() {
+		KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
+		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+		assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
+	}
+
 	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
 			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
+	protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
+			Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
 	protected abstract SerializationSchema<Row> getSerializationSchema();
 
 	private KafkaTableSink createTableSink() {
 		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
 	}
 
+	private KafkaTableSink createTableSinkWithFlinkPartitioner() {
+		return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
+	}
+
 	private static Properties createSinkProperties() {
 		Properties properties = new Properties();
 		properties.setProperty("bootstrap.servers", "localhost:12345");
@@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase {
 			return 0;
 		}
 	}
+
+	private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+		@Override
+		public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 9a7c96a..311a1a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,20 +17,20 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
 /**
  * Abstract class providing a Kafka test environment
  */
@@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract <T> StreamSink<T> getProducerSink(String topic,
 			KeyedSerializationSchema<T> serSchema, Properties props,
-			KafkaPartitioner<T> partitioner);
+			FlinkKafkaPartitioner<T> partitioner);
 
 	public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
 														KeyedSerializationSchema<T> serSchema, Properties props,
-														KafkaPartitioner<T> partitioner);
+														FlinkKafkaPartitioner<T> partitioner);
 
 	// -- offset handlers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
new file mode 100644
index 0000000..fa84199
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkFixedPartitioner {
+
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 4);
+		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+		part.open(2, 4);
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+		part.open(3, 4);
+		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.open(0, 2);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 2);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+		int[] partitions = new int[]{0,1};
+
+		part.open(0, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 3);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+		part.open(2, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..c6be71c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
+ */
+public class TestFlinkKafkaDelegatePartitioner {
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 4);
+		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+		part.open(2, 4);
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+		part.open(3, 4);
+		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.setPartitions(partitions);
+		
+		part.open(0, 2);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 2);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+		int[] partitions = new int[]{0,1};
+		part.setPartitions(partitions);
+
+		part.open(0, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 3);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+		part.open(2, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c383eb5..c0fb836 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
 @SuppressWarnings("serial")
 public class DataGenerators {
 
@@ -107,10 +107,10 @@ public class DataGenerators {
 		testServer.produceIntoKafka(stream, topic,
 				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
 				props,
-				new KafkaPartitioner<Integer>() {
+				new FlinkKafkaPartitioner<Integer>() {
 					@Override
-					public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-						return next % numPartitions;
+					public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+						return next % partitions.length;
 					}
 				});
 
@@ -149,7 +149,7 @@ public class DataGenerators {
 						topic,
 						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
 						producerProperties,
-						new FixedPartitioner<String>());
+						new FlinkFixedPartitioner<String>());
 
 				OneInputStreamOperatorTestHarness<String, Object> testHarness =
 						new OneInputStreamOperatorTestHarness<>(sink);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
new file mode 100644
index 0000000..e7fff52
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer, Integer>> {
+	private static final long serialVersionUID = -3589898230375281549L;
+
+	private final int expectedPartitions;
+
+	public Tuple2FlinkPartitioner(int expectedPartitions) {
+		this.expectedPartitions = expectedPartitions;
+	}
+	
+	@Override
+	public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+		if (partitions.length != expectedPartitions) {
+			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+		}
+		
+		return next.f0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index c9e9ac1..43e1aa7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.io.Serializable;
+
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 
-import java.io.Serializable;
-
 /**
  * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
+ * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
  */
+@Deprecated
 public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
@@ -45,4 +46,4 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>
 
 		return next.f0;
 	}
-}
\ No newline at end of file
+}


[3/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics

Posted by tz...@apache.org.
[FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58c4eed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58c4eed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58c4eed5

Branch: refs/heads/release-1.3
Commit: 58c4eed5accb0912472735bb00c148f6344a5679
Parents: 1a43bad
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 17:41:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:41:54 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java |  82 ++++++++++--
 .../connectors/kafka/Kafka010ITCase.java        |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   8 +-
 .../connectors/kafka/FlinkKafkaProducer.java    |   9 +-
 .../connectors/kafka/FlinkKafkaProducer08.java  |  40 +++++-
 .../connectors/kafka/Kafka08JsonTableSink.java  |  17 +++
 .../kafka/Kafka08JsonTableSinkTest.java         |  14 ++
 .../connectors/kafka/KafkaProducerTest.java     |   5 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   6 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |  42 +++++-
 .../connectors/kafka/Kafka09JsonTableSink.java  |  29 +++++
 .../kafka/Kafka09JsonTableSinkTest.java         |  15 +++
 .../connectors/kafka/KafkaProducerTest.java     |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  10 +-
 .../kafka/FlinkKafkaProducerBase.java           | 127 ++++++++++++-------
 .../connectors/kafka/KafkaJsonTableSink.java    |  16 ++-
 .../connectors/kafka/KafkaTableSink.java        |  35 ++++-
 .../kafka/partitioner/FixedPartitioner.java     |   3 +-
 .../partitioner/FlinkFixedPartitioner.java      |  71 +++++++++++
 .../FlinkKafkaDelegatePartitioner.java          |  47 +++++++
 .../partitioner/FlinkKafkaPartitioner.java      |  39 ++++++
 .../kafka/partitioner/KafkaPartitioner.java     |  14 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 106 ++++++++--------
 .../connectors/kafka/KafkaProducerTestBase.java |  11 +-
 .../kafka/KafkaTableSinkTestBase.java           |  56 +++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  14 +-
 .../kafka/TestFlinkFixedPartitioner.java        | 104 +++++++++++++++
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ++++++++++++++++
 .../kafka/testutils/DataGenerators.java         |  20 +--
 .../kafka/testutils/Tuple2FlinkPartitioner.java |  45 +++++++
 .../kafka/testutils/Tuple2Partitioner.java      |   9 +-
 31 files changed, 937 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index cc0194b..7addafa 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Properties;
+
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -27,7 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -35,8 +38,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import java.util.Properties;
-
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -87,7 +88,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
 																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 
@@ -106,7 +107,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 																					String topicId,
 																					SerializationSchema<T> serializationSchema,
 																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -120,7 +121,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *  @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
 																					KeyedSerializationSchema<T> serializationSchema,
@@ -133,6 +136,30 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					FlinkKafkaPartitioner<T> customPartitioner) {
+
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
+
 	// ---------------------- Regular constructors w/o timestamp support  ------------------
 
 	/**
@@ -147,7 +174,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -162,7 +189,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -173,11 +200,26 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -192,7 +234,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
 	}
 
 	/**
@@ -207,19 +249,32 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
 
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
 		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
 	}
+	
+	/**
+	 * Create Kafka producer
+	 *
+	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 */
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+		// invoke call.
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+	}
 
 
 	// ----------------------------- Generic element processing  ---------------------------
@@ -243,10 +298,15 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		}
 
 		ProducerRecord<byte[], byte[]> record;
-		if (internalProducer.partitioner == null) {
+		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
+		if(null == partitions) {
+			partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
+		}
+		if (internalProducer.flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+			record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
 		}
 		if (internalProducer.flushOnCheckpoint) {
 			synchronized (internalProducer.pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 39b2b8f..add623e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -208,11 +208,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
 		});
 
 		final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
-		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+		FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner<Long>() {
 			private static final long serialVersionUID = -6730989584364230617L;
 
 			@Override
-			public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+			public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 				return (int)(next % 3);
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d27e53a..c88c858 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,8 +30,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -116,7 +116,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
@@ -124,7 +124,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a7b89f8..98dac3e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -37,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -45,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+		super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -62,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+		super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**
@@ -70,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN>  {
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		super(topicId, serializationSchema, producerConfig, null);
+		super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>)null);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 65de5fc..64d3716 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -50,7 +51,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -65,7 +66,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -75,12 +76,27 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
 
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -95,7 +111,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -110,7 +126,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -120,11 +136,25 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
 	@Override
 	protected void flush() {
 		// The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 839388f..5a066ec0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -38,6 +39,17 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates {@link KafkaTableSink} for Kafka 0.8
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
@@ -45,6 +57,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	}
 
 	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
 	protected Kafka08JsonTableSink createCopy() {
 		return new Kafka08JsonTableSink(topic, properties, partitioner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 0ac452e..164c162 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -40,6 +41,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 	}
 
 	@Override
+	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+		
+		return new Kafka08JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	protected SerializationSchema<Row> getSerializationSchema() {
 		return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 65d7596..c7da5af 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,7 +84,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (1) producer that propagates errors
 
 			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -106,7 +107,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 643ee8e..2419b53 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -109,7 +109,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
 			Properties props,
-			KafkaPartitioner<T> partitioner) {
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
 				topic,
 				serSchema,
@@ -120,7 +120,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 2a3e39d..4f41c43 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -51,7 +52,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			User defined (keyless) serialization schema.
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -66,7 +67,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -77,12 +78,28 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
 
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 */
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+	}
+
 	// ------------------- Key/Value serialization schema constructors ----------------------
 
 	/**
@@ -97,7 +114,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			User defined serialization schema supporting key/value messages
 	 */
 	public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
-		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -112,7 +129,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * 			Properties with the producer configuration.
 	 */
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-		this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
 	}
 
 	/**
@@ -123,11 +140,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 */
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	}
+
 	@Override
 	protected void flush() {
 		if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index edbebd0..b82ebc4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -33,17 +34,45 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates {@link KafkaTableSink} for Kafka 0.9
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
+	/**
+	 *
+	 * @param topic               Kafka topic to produce to.
+	 * @param properties          Properties for the Kafka producer.
+	 * @param serializationSchema Serialization schema to use to create Kafka records.
+	 * @param partitioner         Partitioner to select Kafka partition.
+	 * @return The version-specific Kafka producer
+	 * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+	 */
+	@Deprecated
 	@Override
 	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
 		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
 	}
 
 	@Override
+	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+	}
+
+	@Override
 	protected Kafka09JsonTableSink createCopy() {
 		return new Kafka09JsonTableSink(topic, properties, partitioner);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index df84a0f..ad8f623 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -26,6 +27,7 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
+	@Deprecated
 	@Override
 	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
@@ -40,6 +42,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 	}
 
 	@Override
+	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+			final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+		return new Kafka09JsonTableSink(topic, properties, partitioner) {
+			@Override
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+				return kafkaProducer;
+			}
+		};
+	}
+
+	@Override
 	@SuppressWarnings("unchecked")
 	protected SerializationSchema<Row> getSerializationSchema() {
 		return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 18b2aec..e9a4947 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,9 +84,8 @@ public class KafkaProducerTest extends TestLogger {
 			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
 			
 			// (1) producer that propagates errors
-
 			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
 
 			OneInputStreamOperatorTestHarness<String, Object> testHarness =
 					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
-					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+					"mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
 			producerLogging.setLogFailuresOnly(true);
 
 			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c9ef6da..84fdbf8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
 import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -32,8 +32,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -48,9 +48,9 @@ import scala.collection.Seq;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -106,14 +106,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 			String topic,
 			KeyedSerializationSchema<T> serSchema,
 			Properties props,
-			KafkaPartitioner<T> partitioner) {
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6a7b17f..f9a1e41 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,6 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,6 +38,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -45,13 +55,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
 import static java.util.Objects.requireNonNull;
 
 
@@ -76,12 +79,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	/**
-	 * Array with the partition ids of the given defaultTopicId
-	 * The size of this array is the number of partitions
-	 */
-	protected int[] partitions;
-
-	/**
 	 * User defined properties for the Producer
 	 */
 	protected final Properties producerConfig;
@@ -98,9 +95,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	protected final KeyedSerializationSchema<IN> schema;
 
 	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition.
+	 * User-provided partitioner for assigning an object to a Kafka partition for each topic
+	 */
+	protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+
+	/**
+	 * Partitions for each topic
 	 */
-	protected final KafkaPartitioner<IN> partitioner;
+	protected final Map<String, int[]> topicPartitionsMap;
 
 	/**
 	 * Flag indicating whether to accept failures (and log them), or to fail on failures
@@ -111,7 +113,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 * If true, the producer will wait until all outstanding records have been send to the broker.
 	 */
 	protected boolean flushOnCheckpoint;
-	
+
+	/**
+	 * Retry times of fetching kafka meta
+	 */
+	protected long kafkaMetaRetryTimes;
+
 	// -------------------------------- Runtime fields ------------------------------------------
 
 	/** KafkaProducer instance */
@@ -133,14 +140,28 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 
 	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
+	 * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
 	 *
 	 * @param defaultTopicId The default topic to write data to
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+	}
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param defaultTopicId The default topic to write data to
+	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+	 */
+	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		requireNonNull(defaultTopicId, "TopicID not set");
 		requireNonNull(serializationSchema, "serializationSchema not set");
 		requireNonNull(producerConfig, "producerConfig not set");
@@ -150,6 +171,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		this.defaultTopicId = defaultTopicId;
 		this.schema = serializationSchema;
 		this.producerConfig = producerConfig;
+		this.flinkKafkaPartitioner = customPartitioner;
 
 		// set the producer configuration properties for kafka record key value serializers.
 		if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
@@ -169,7 +191,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
 		}
 
-		this.partitioner = customPartitioner;
+		this.topicPartitionsMap = new HashMap<>();
 	}
 
 	// ---------------------------------- Properties --------------------------
@@ -177,9 +199,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	/**
 	 * Defines whether the producer should fail on errors, or only log them.
 	 * If this is set to true, then exceptions will be only logged, if set to false,
-	 * exceptions will be eventually thrown and cause the streaming program to 
+	 * exceptions will be eventually thrown and cause the streaming program to
 	 * fail (and enter recovery).
-	 * 
+	 *
 	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
 	 */
 	public void setLogFailuresOnly(boolean logFailuresOnly) {
@@ -205,7 +227,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	}
 
 	// ----------------------------------- Utilities --------------------------
-	
+
 	/**
 	 * Initializes the connection to Kafka.
 	 */
@@ -214,27 +236,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		producer = getKafkaProducer(this.producerConfig);
 
 		RuntimeContext ctx = getRuntimeContext();
-		if (partitioner != null) {
-			// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-			List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-			// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-			Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-				@Override
-				public int compare(PartitionInfo o1, PartitionInfo o2) {
-					return Integer.compare(o1.partition(), o2.partition());
-				}
-			});
-
-			partitions = new int[partitionsList.size()];
-			for (int i = 0; i < partitions.length; i++) {
-				partitions[i] = partitionsList.get(i).partition();
+		if(null != flinkKafkaPartitioner) {
+			if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
+				((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
 			}
-
-			partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
 		}
 
-		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
 				ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
 		// register Kafka metrics to Flink accumulators
@@ -281,6 +290,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 	}
 
+	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+				@Override
+				public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+			});
+
+		int[] partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		return partitions;
+	}
+
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 *
@@ -299,11 +328,17 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			targetTopic = defaultTopicId;
 		}
 
+		int[] partitions = this.topicPartitionsMap.get(targetTopic);
+		if(null == partitions) {
+			partitions = this.getPartitionsByTopic(targetTopic, producer);
+			this.topicPartitionsMap.put(targetTopic, partitions);
+		}
+
 		ProducerRecord<byte[], byte[]> record;
-		if (partitioner == null) {
+		if (flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
 		}
 		if (flushOnCheckpoint) {
 			synchronized (pendingRecordsLock) {
@@ -319,7 +354,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		if (producer != null) {
 			producer.close();
 		}
-		
+
 		// make sure we propagate pending errors
 		checkErroneous();
 	}
@@ -376,15 +411,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
 		}
 	}
-	
+
 	public static Properties getPropertiesFromBrokerList(String brokerList) {
 		String[] elements = brokerList.split(",");
-		
+
 		// validate the broker addresses
 		for (String broker: elements) {
 			NetUtils.getCorrectHostnamePort(broker);
 		}
-		
+
 		Properties props = new Properties();
 		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
 		return props;

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 27c4de7..a0b5033 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.types.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
@@ -35,10 +36,23 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
+	
+	/**
+	 * Creates KafkaJsonTableSink
+	 *
+	 * @param topic topic in Kafka to which table is written
+	 * @param properties properties to connect to Kafka
+	 * @param partitioner Kafka partitioner
+	 */
+	public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+		super(topic, properties, partitioner);
+	}
 
 	@Override
 	protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index a8a2fd0..0a937d6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -39,7 +41,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	protected final String topic;
 	protected final Properties properties;
 	protected SerializationSchema<Row> serializationSchema;
-	protected final KafkaPartitioner<Row> partitioner;
+	protected final FlinkKafkaPartitioner<Row> partitioner;
 	protected String[] fieldNames;
 	protected TypeInformation[] fieldTypes;
 
@@ -49,12 +51,27 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param topic                 Kafka topic to write to.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	public KafkaTableSink(
 			String topic,
 			Properties properties,
 			KafkaPartitioner<Row> partitioner) {
+		this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
+	}
 
+	/**
+	 * Creates KafkaTableSink
+	 * 
+	 * @param topic                 Kafka topic to write to.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param partitioner           Partitioner to select Kafka partition for each item
+	 */
+	public KafkaTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner) {
 		this.topic = Preconditions.checkNotNull(topic, "topic");
 		this.properties = Preconditions.checkNotNull(properties, "properties");
 		this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
@@ -68,13 +85,29 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param serializationSchema Serialization schema to use to create Kafka records.
 	 * @param partitioner         Partitioner to select Kafka partition.
 	 * @return The version-specific Kafka producer
+	 * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
 	 */
+	@Deprecated
 	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
 		String topic, Properties properties,
 		SerializationSchema<Row> serializationSchema,
 		KafkaPartitioner<Row> partitioner);
 
 	/**
+	 * Returns the version-specifid Kafka producer.
+	 *
+	 * @param topic               Kafka topic to produce to.
+	 * @param properties          Properties for the Kafka producer.
+	 * @param serializationSchema Serialization schema to use to create Kafka records.
+	 * @param partitioner         Partitioner to select Kafka partition.
+	 * @return The version-specific Kafka producer
+	 */
+	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+		String topic, Properties properties,
+		SerializationSchema<Row> serializationSchema,
+		FlinkKafkaPartitioner<Row> partitioner);
+
+	/**
 	 * Create serialization schema for converting table rows into bytes.
 	 *
 	 * @param fieldNames Field names in table rows.

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index 9b848e0..edabfe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -48,9 +48,10 @@ import java.io.Serializable;
  *  Not all Kafka partitions contain data
  *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
  *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ *  @deprecated Use {@link FlinkFixedPartitioner} instead.
  *
  */
+@Deprecated
 public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
 	private static final long serialVersionUID = 1627268846962918126L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
new file mode 100644
index 0000000..d2eb7af
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2	----------------&gt;	2
+ * 										3
+ * 										4
+ * 										5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ */
+public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+	private int parallelInstanceId;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances) {
+		if (parallelInstanceId < 0 || parallelInstances <= 0) {
+			throw new IllegalArgumentException();
+		}
+		this.parallelInstanceId = parallelInstanceId;
+	}
+	
+	@Override
+	public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+		if(null == partitions || partitions.length == 0) {
+			throw new IllegalArgumentException();
+		}
+		
+		return partitions[parallelInstanceId % partitions.length];
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..469fd1b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * Delegate for KafkaPartitioner
+ * @param <T>
+ * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
+ */
+@Deprecated
+public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
+	private final KafkaPartitioner<T> kafkaPartitioner;
+	private int[] partitions;
+
+	public FlinkKafkaDelegatePartitioner(KafkaPartitioner<T> kafkaPartitioner) {
+		this.kafkaPartitioner = kafkaPartitioner;
+	}
+
+	public void setPartitions(int[] partitions) {
+		this.partitions = partitions;
+	}
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances) {
+		this.kafkaPartitioner.open(parallelInstanceId, parallelInstances, partitions);
+	}
+
+	@Override
+	public int partition(T next, byte[] serializedKey, byte[] serializedValue, String targetTopic, int[] partitions) {
+		return this.kafkaPartitioner.partition(next, serializedKey, serializedValue, this.partitions.length);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
new file mode 100644
index 0000000..e074b9b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+	private static final long serialVersionUID = -9086719227828020494L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 */
+	public void open(int parallelInstanceId, int parallelInstances) {
+		// overwrite this method if needed.
+	}
+	
+	public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 37e2ef6..7c82bd1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
+ * @deprecated Use {@link FlinkKafkaPartitioner} instead.
  */
+@Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
 
 	private static final long serialVersionUID = -1974260817778593473L;
@@ -32,10 +34,22 @@ public abstract class KafkaPartitioner<T> implements Serializable {
 	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
 	 */
+	@Deprecated
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
 		// overwrite this method if needed.
 	}
 
+	/**
+	 *
+	 * @param next
+	 * @param serializedKey
+	 * @param serializedValue
+	 * @param numPartitions
+	 * @return
+	 * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
+	 */
+	@Deprecated
 	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
 }


[6/6] flink git commit: [hotfix] [kafka] Remove unused operator state store field in FlinkKafkaProducerBase

Posted by tz...@apache.org.
[hotfix] [kafka] Remove unused operator state store field in FlinkKafkaProducerBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0963718a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0963718a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0963718a

Branch: refs/heads/release-1.3
Commit: 0963718acf7137a481d7d1c28140e04ad613e71d
Parents: d3b5870
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 19 12:11:02 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:42:12 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaProducerBase.java      | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0963718a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 3a8228c..46d7d47 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -27,7 +27,6 @@ import java.util.Properties;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
@@ -130,8 +129,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	/** Number of unacknowledged records. */
 	protected long pendingRecords;
 
-	protected OperatorStateStore stateStore;
-
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
@@ -344,7 +341,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 	@Override
 	public void initializeState(FunctionInitializationContext context) throws Exception {
-		this.stateStore = context.getOperatorStateStore();
+		// nothing to do
 	}
 
 	@Override


[4/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8285048..bcc8328 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,10 +29,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -42,31 +46,50 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
-
 	/**
-	 * 
+	 * This tests verifies that custom partitioning works correctly, with a default topic
+	 * and dynamic topic. The number of partitions for each topic is deliberately different.
+	 *
+	 * Test topology:
+	 *
 	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+	 *             +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
+	 *            /                  |                             |          |        |
+	 *           |                   |                             |          |  ------+--> (sink)
+	 *             +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+
+	 *            /                  |
+	 *           |                   |
+	 * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+
+	 *           |                   |                             |          |        |
+	 *            \                  |                             |          |        |
+	 *             +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink)
+	 *           |                   |                             |          |        |
+	 *            \                  |                             |          |        |
+	 *             +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
 	 * </pre>
 	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
+	 * Each topic has an independent mapper that validates the values come consistently from
+	 * the correct Kafka partition of the topic is is responsible of.
 	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
+	 * Each topic also has a final sink that validates that there are no duplicates and that all
+	 * partitions are present.
 	 */
 	public void runCustomPartitioningTest() {
 		try {
 			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
 
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
+			final String defaultTopic = "defaultTopic";
+			final int defaultTopicPartitions = 2;
+
+			final String dynamicTopic = "dynamicTopic";
+			final int dynamicTopicPartitions = 3;
+
+			createTestTopic(defaultTopic, defaultTopicPartitions, 1);
+			createTestTopic(dynamicTopic, dynamicTopicPartitions, 1);
+
+			Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2);
+			expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
+			expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
 
 			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
 
@@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			env.getConfig().disableSysoutLogging();
 
 			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+				new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
 			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+				new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
 			// ------ producing topology ---------
-			
+
 			// source has DOP 1 to make sure it generates no duplicates
 			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
 
@@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 				public void cancel() {
 					running = false;
 				}
-			})
-			.setParallelism(1);
+			}).setParallelism(1);
 
 			Properties props = new Properties();
 			props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
 			props.putAll(secureProps);
-			
-			// sink partitions into 
-			kafkaServer.produceIntoKafka(stream, topic,
-					new KeyedSerializationSchemaWrapper<>(serSchema),
+
+			// sink partitions into
+			kafkaServer.produceIntoKafka(stream, defaultTopic,
+					// this serialization schema will route between the default topic and dynamic topic
+					new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
 					props,
-					new CustomPartitioner(parallelism)).setParallelism(parallelism);
+					new CustomPartitioner(expectedTopicsToNumPartitions))
+				.setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
 
 			// ------ consuming topology ---------
 
 			Properties consumerProps = new Properties();
 			consumerProps.putAll(standardProps);
 			consumerProps.putAll(secureProps);
-			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
+
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource =
+					kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps);
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource =
+					kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps);
+
+			env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
+				.map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
+				.addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
+
+			env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
+				.map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
+				.addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
+
 			tryExecute(env, "custom partitioning test");
 
-			deleteTestTopic(topic);
-			
+			deleteTestTopic(defaultTopic);
+			deleteTestTopic(dynamicTopic);
+
 			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
 		}
 		catch (Exception e) {
@@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
-		private final int expectedPartitions;
+		private final Map<String, Integer> expectedTopicsToNumPartitions;
 
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
+		public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
+			this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
 		}
 
-
 		@Override
 		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
-			assertEquals(expectedPartitions, partitions.length);
+			assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length);
+
+			return (int) (next.f0 % partitions.length);
+		}
+	}
+
+	/**
+	 * A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics.
+	 */
+	public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
+
+		private final String defaultTopic;
+		private final String dynamicTopic;
+
+		public CustomKeyedSerializationSchemaWrapper(
+				SerializationSchema<Tuple2<Long, String>> serializationSchema,
+				String defaultTopic,
+				String dynamicTopic) {
+
+			super(serializationSchema);
+
+			this.defaultTopic = Preconditions.checkNotNull(defaultTopic);
+			this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic);
+		}
 
-			return (int) (next.f0 % expectedPartitions);
+		@Override
+		public String getTargetTopic(Tuple2<Long, String> element) {
+			return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic;
+		}
+	}
+
+	/**
+	 * Mapper that validates partitioning and maps to partition.
+	 */
+	public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
+
+		private final int numPartitions;
+
+		private int ourPartition = -1;
+
+		public PartitionValidatingMapper(int numPartitions) {
+			this.numPartitions = numPartitions;
+		}
+
+		@Override
+		public Integer map(Tuple2<Long, String> value) throws Exception {
+			int partition = value.f0.intValue() % numPartitions;
+			if (ourPartition != -1) {
+				assertEquals("inconsistent partitioning", ourPartition, partition);
+			} else {
+				ourPartition = partition;
+			}
+			return partition;
+		}
+	}
+
+	/**
+	 * Sink that validates records received from each partition and checks that there are no duplicates.
+	 */
+	public static class PartitionValidatingSink implements SinkFunction<Integer> {
+		private final int[] valuesPerPartition;
+
+		public PartitionValidatingSink(int numPartitions) {
+			this.valuesPerPartition = new int[numPartitions];
+		}
+
+		@Override
+		public void invoke(Integer value) throws Exception {
+			valuesPerPartition[value]++;
+
+			boolean missing = false;
+			for (int i : valuesPerPartition) {
+				if (i < 100) {
+					missing = true;
+					break;
+				}
+			}
+			if (!missing) {
+				throw new SuccessException();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 0fdc82e..d4fe9cc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,14 +17,12 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.Serializable;
 import java.util.Properties;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.api.Types;
@@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase {
 	private static final String TOPIC = "testTopic";
 	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
-	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
-	private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
+	private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
 	@SuppressWarnings("unchecked")
 	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
-	public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
-		DataStream dataStream = mock(DataStream.class);
-
-		KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
-		kafkaTableSink.emitDataStream(dataStream);
-
-		verify(dataStream).addSink(eq(PRODUCER));
-
-		verify(kafkaTableSink).createKafkaProducer(
-			eq(TOPIC),
-			eq(PROPERTIES),
-			any(getSerializationSchema().getClass()),
-			eq(FLINK_PARTITIONER));
-	}
-
-	@Test
 	public void testConfiguration() {
 		KafkaTableSink kafkaTableSink = createTableSink();
 		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase {
 		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
-	@Test
-	public void testConfigurationWithFlinkPartitioner() {
-		KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
-		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
-		assertNotSame(kafkaTableSink, newKafkaTableSink);
-
-		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
-		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
-		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
-	}
-
 	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
-			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
-
-	protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
-			Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+			FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
 	protected abstract SerializationSchema<Row> getSerializationSchema();
 
@@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase {
 		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
 	}
 
-	private KafkaTableSink createTableSinkWithFlinkPartitioner() {
-		return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
-	}
-
 	private static Properties createSinkProperties() {
 		Properties properties = new Properties();
 		properties.setProperty("bootstrap.servers", "localhost:12345");
 		return properties;
 	}
 
-	private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
-		@Override
-		public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			return 0;
-		}
-	}
-
-	private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+	private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
 		@Override
 		public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 			return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
-
-		part.open(2, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
-		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
-
-		part.open(3, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.open(0, 2, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 2, partitions);
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-		int[] partitions = new int[]{0,1};
-
-		part.open(0, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 3, partitions);
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-
-		part.open(2, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
deleted file mode 100644
index c6be71c..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
- */
-public class TestFlinkKafkaDelegatePartitioner {
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 4);
-		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
-		part.open(2, 4);
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
-		part.open(3, 4);
-		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.setPartitions(partitions);
-		
-		part.open(0, 2);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 2);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-		int[] partitions = new int[]{0,1};
-		part.setPartitions(partitions);
-
-		part.open(0, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 3);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
-		part.open(2, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 43e1aa7..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
- */
-@Deprecated
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final int expectedPartitions;
-
-	public Tuple2Partitioner(int expectedPartitions) {
-		this.expectedPartitions = expectedPartitions;
-	}
-
-	@Override
-	public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-		if (numPartitions != expectedPartitions) {
-			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
-		}
-
-		return next.f0;
-	}
-}


[5/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

Posted by tz...@apache.org.
[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

This commit wraps up some general improvements to the new Kafka sink
custom partitioning API, most notably:
1. remove deprecated constructors from base classes, as they are not
user-facing.
2. modify producer IT test to test custom partitioning for dynamic
topics.
3. improve documentation and Javadocs of the new interfaces.

This closes #3901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3b58709
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3b58709
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3b58709

Branch: refs/heads/release-1.3
Commit: d3b587096b1fd694625429aa32557e70ed84955d
Parents: 58c4eed
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 18 21:52:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:42:04 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |   4 +-
 .../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++-----
 .../connectors/kafka/FlinkKafkaProducer08.java  |  50 +++--
 .../connectors/kafka/Kafka08JsonTableSink.java  |  21 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |  25 +--
 .../connectors/kafka/FlinkKafkaProducer09.java  |  52 +++--
 .../connectors/kafka/Kafka09JsonTableSink.java  |  28 +--
 .../kafka/Kafka09JsonTableSinkTest.java         |  26 +--
 .../kafka/FlinkKafkaProducerBase.java           |  89 ++++----
 .../connectors/kafka/KafkaJsonTableSink.java    |  14 --
 .../connectors/kafka/KafkaTableSink.java        |  40 +---
 .../kafka/partitioner/FixedPartitioner.java     |  77 -------
 .../partitioner/FlinkFixedPartitioner.java      |  19 +-
 .../FlinkKafkaDelegatePartitioner.java          |   5 +-
 .../partitioner/FlinkKafkaPartitioner.java      |  24 +-
 .../kafka/partitioner/KafkaPartitioner.java     |  16 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |  38 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  98 ++++----
 .../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++-------
 .../kafka/KafkaTableSinkTestBase.java           |  51 +----
 .../connectors/kafka/TestFixedPartitioner.java  | 104 ---------
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ----------
 .../kafka/testutils/Tuple2Partitioner.java      |  49 ----
 23 files changed, 458 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 60a8039..bc7e7de 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -456,9 +456,9 @@ are other constructor variants that allow providing the following:
  Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
  details on how to configure Kafka Producers.
  * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
  constructor. This partitioner will be called for each record in the stream
- to determine which exact partition the record will be sent to.
+ to determine which exact partition of the target topic the record should be sent to.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to override the target topic,

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7addafa..711fe07 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
 import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *  @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig,
-																					KafkaPartitioner<T> customPartitioner) {
-
-		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
-		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 *  @param inStream The stream to write to Kafka
-	 *  @param topicId The name of the target topic
-	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
 	 */
 	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
 																					String topicId,
@@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
 	 */
 	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
 		this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
 	}
-
+	
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
-	 * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
 		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
 	}
-	
+
+	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 *  @deprecated This is a deprecated since it does not correctly handle partitioning when
+	 *              producing to multiple topics. Use
+	 *              {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					KafkaPartitioner<T> customPartitioner) {
+
+		GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+		FlinkKafkaProducer010<T> kafkaProducer =
+				new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+		SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 *
+	 * @deprecated This is a deprecated since it does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * Create Kafka producer
 	 *
 	 * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+	@Deprecated
+	public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
 		// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
 		// invoke call.
-		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+		super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
 	}
 
-
 	// ----------------------------- Generic element processing  ---------------------------
 
 	private void invokeInternal(T next, long elementTimestamp) throws Exception {
@@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
 		ProducerRecord<byte[], byte[]> record;
 		int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
 		if(null == partitions) {
-			partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+			partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
 			internalProducer.topicPartitionsMap.put(targetTopic, partitions);
 		}
 		if (internalProducer.flinkKafkaPartitioner == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 64d3716..08dcb2f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
-	}
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
 	 */
 	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- Deprecated constructors ----------------------
+
+	/**
+	 * The main constructor for creating a FlinkKafkaProducer.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
@@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN>  {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	@Deprecated
+	public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
 	}
 
+	// ---------------------------------------------------------------------
+
 	@Override
 	protected void flush() {
 		// The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5a066ec0..80bd180 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,7 +29,7 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+	
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.8
 	 *
@@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
 	 */
-	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
-	
+
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.8
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+	@Deprecated
+	public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 164c162..2136476 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -28,26 +27,20 @@ import java.util.Properties;
 public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
 	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+	protected KafkaTableSink createTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
 		return new Kafka08JsonTableSink(topic, properties, partitioner) {
 			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+					String topic,
+					Properties properties,
+					SerializationSchema<Row> serializationSchema,
+					FlinkKafkaPartitioner<Row> partitioner) {
 
-	@Override
-	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-		
-		return new Kafka08JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
 				return kafkaProducer;
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 4f41c43..cbed361 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * @param topicId The topic to write data to
-	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
 	 */
 	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- Deprecated constructors ----------------------
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * @param topicId The topic to write data to
+	 * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+	 */
+	@Deprecated
+	public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+	}
+
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
 	 * the topic.
@@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
 	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
 	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
 	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
 	 */
-	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
-		super(topicId, serializationSchema, producerConfig, customPartitioner);
+	@Deprecated
+	public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+		super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
 	}
 
+	// ------------------------------------------------------------------
+
 	@Override
 	protected void flush() {
 		if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b82ebc4..a81422e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,43 +29,32 @@ import java.util.Properties;
  * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+	
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.9
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
-	 * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
 	 */
-	@Deprecated
-	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
 		super(topic, properties, partitioner);
 	}
-	
+
 	/**
 	 * Creates {@link KafkaTableSink} for Kafka 0.9
 	 *
 	 * @param topic topic in Kafka to which table is written
 	 * @param properties properties to connect to Kafka
 	 * @param partitioner Kafka partitioner
-	 */
-	public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
-
-	/**
 	 *
-	 * @param topic               Kafka topic to produce to.
-	 * @param properties          Properties for the Kafka producer.
-	 * @param serializationSchema Serialization schema to use to create Kafka records.
-	 * @param partitioner         Partitioner to select Kafka partition.
-	 * @return The version-specific Kafka producer
-	 * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+	 * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+	 *             producing to multiple topics. Use
+	 *             {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
 	 */
 	@Deprecated
-	@Override
-	protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-		return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+	public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+		super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index ad8f623..3afb5e4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -27,28 +26,21 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
-	@Deprecated
 	@Override
-	protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+	protected KafkaTableSink createTableSink(
+			String topic,
+			Properties properties,
+			FlinkKafkaPartitioner<Row> partitioner,
 			final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
 		return new Kafka09JsonTableSink(topic, properties, partitioner) {
 			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
-				return kafkaProducer;
-			}
-		};
-	}
+			protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+					String topic,
+					Properties properties,
+					SerializationSchema<Row> serializationSchema,
+					FlinkKafkaPartitioner<Row> partitioner) {
 
-	@Override
-	protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-			final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-		return new Kafka09JsonTableSink(topic, properties, partitioner) {
-			@Override
-			protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
-					SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
 				return kafkaProducer;
 			}
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f9a1e41..3a8228c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.Callback;
@@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	private static final long serialVersionUID = 1L;
 
 	/**
-	 * Configuration key for disabling the metrics reporting
+	 * Configuration key for disabling the metrics reporting.
 	 */
 	public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
 
 	/**
-	 * User defined properties for the Producer
+	 * User defined properties for the Producer.
 	 */
 	protected final Properties producerConfig;
 
 	/**
-	 * The name of the default topic this producer is writing data to
+	 * The name of the default topic this producer is writing data to.
 	 */
 	protected final String defaultTopicId;
 
 	/**
-	 * (Serializable) SerializationSchema for turning objects used with Flink into
+	 * (Serializable) SerializationSchema for turning objects used with Flink into.
 	 * byte[] for Kafka.
 	 */
 	protected final KeyedSerializationSchema<IN> schema;
 
 	/**
-	 * User-provided partitioner for assigning an object to a Kafka partition for each topic
+	 * User-provided partitioner for assigning an object to a Kafka partition for each topic.
 	 */
-	protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+	protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
 
 	/**
-	 * Partitions for each topic
+	 * Partitions of each topic
 	 */
 	protected final Map<String, int[]> topicPartitionsMap;
 
 	/**
-	 * Flag indicating whether to accept failures (and log them), or to fail on failures
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures.
 	 */
 	protected boolean logFailuresOnly;
 
@@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 	 */
 	protected boolean flushOnCheckpoint;
 
-	/**
-	 * Retry times of fetching kafka meta
-	 */
-	protected long kafkaMetaRetryTimes;
-
 	// -------------------------------- Runtime fields ------------------------------------------
 
 	/** KafkaProducer instance */
@@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 	protected OperatorStateStore stateStore;
 
-
-	/**
-	 * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
-	 *
-	 * @param defaultTopicId The default topic to write data to
-	 * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
-	 * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
-		this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
-	}
-
 	/**
 	 * The main constructor for creating a FlinkKafkaProducer.
 	 *
@@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		producer = getKafkaProducer(this.producerConfig);
 
 		RuntimeContext ctx = getRuntimeContext();
+
 		if(null != flinkKafkaPartitioner) {
 			if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
-				((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
+				((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
+						getPartitionsByTopic(this.defaultTopicId, this.producer));
 			}
 			flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
 		}
@@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		}
 	}
 
-	protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
-		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
-		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
-
-		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
-		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
-				@Override
-				public int compare(PartitionInfo o1, PartitionInfo o2) {
-				return Integer.compare(o1.partition(), o2.partition());
-			}
-			});
-
-		int[] partitions = new int[partitionsList.size()];
-		for (int i = 0; i < partitions.length; i++) {
-			partitions[i] = partitionsList.get(i).partition();
-		}
-
-		return partitions;
-	}
-
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
 	 *
@@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 
 		int[] partitions = this.topicPartitionsMap.get(targetTopic);
 		if(null == partitions) {
-			partitions = this.getPartitionsByTopic(targetTopic, producer);
+			partitions = getPartitionsByTopic(targetTopic, producer);
 			this.topicPartitionsMap.put(targetTopic, partitions);
 		}
 
@@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		if (flinkKafkaPartitioner == null) {
 			record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
 		} else {
-			record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
+			record = new ProducerRecord<>(
+					targetTopic,
+					flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
+					serializedKey,
+					serializedValue);
 		}
 		if (flushOnCheckpoint) {
 			synchronized (pendingRecordsLock) {
@@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
 		return props;
 	}
 
+	protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+		// the fetched list is immutable, so we're creating a mutable copy in order to sort it
+		List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+		// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+		Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+			@Override
+			public int compare(PartitionInfo o1, PartitionInfo o2) {
+				return Integer.compare(o1.partition(), o2.partition());
+			}
+		});
+
+		int[] partitions = new int[partitionsList.size()];
+		for (int i = 0; i < partitions.length; i++) {
+			partitions[i] = partitionsList.get(i).partition();
+		}
+
+		return partitions;
+	}
+
 	@VisibleForTesting
 	protected long numPendingRecords() {
 		synchronized (pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index a0b5033..41bb329 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
@@ -29,19 +28,6 @@ import java.util.Properties;
  * Base class for {@link KafkaTableSink} that serializes data in JSON format
  */
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
-	/**
-	 * Creates KafkaJsonTableSink
-	 *
-	 * @param topic topic in Kafka to which table is written
-	 * @param properties properties to connect to Kafka
-	 * @param partitioner Kafka partitioner
-	 * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
-		super(topic, properties, partitioner);
-	}
 	
 	/**
 	 * Creates KafkaJsonTableSink

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 0a937d6..1c38816 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,13 +18,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
  */
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
@@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
 	/**
 	 * Creates KafkaTableSink
-	 *
-	 * @param topic                 Kafka topic to write to.
-	 * @param properties            Properties for the Kafka consumer.
-	 * @param partitioner           Partitioner to select Kafka partition for each item
-	 * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	public KafkaTableSink(
-			String topic,
-			Properties properties,
-			KafkaPartitioner<Row> partitioner) {
-		this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
-	}
-
-	/**
-	 * Creates KafkaTableSink
 	 * 
 	 * @param topic                 Kafka topic to write to.
 	 * @param properties            Properties for the Kafka consumer.
@@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 	 * @param serializationSchema Serialization schema to use to create Kafka records.
 	 * @param partitioner         Partitioner to select Kafka partition.
 	 * @return The version-specific Kafka producer
-	 * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
-	 */
-	@Deprecated
-	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-		String topic, Properties properties,
-		SerializationSchema<Row> serializationSchema,
-		KafkaPartitioner<Row> partitioner);
-
-	/**
-	 * Returns the version-specifid Kafka producer.
-	 *
-	 * @param topic               Kafka topic to produce to.
-	 * @param properties          Properties for the Kafka producer.
-	 * @param serializationSchema Serialization schema to use to create Kafka records.
-	 * @param partitioner         Partitioner to select Kafka partition.
-	 * @return The version-specific Kafka producer
 	 */
 	protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
 		String topic, Properties properties,
@@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
 		return copy;
 	}
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index edabfe0..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * 	# More Flink partitions than kafka partitions
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2   --------------/
- * 			3   -------------/
- * 			4	------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * 		Flink Sinks:		Kafka Partitions
- * 			1	----------------&gt;	1
- * 			2	----------------&gt;	2
- * 										3
- * 										4
- * 										5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *  @deprecated Use {@link FlinkFixedPartitioner} instead.
- *
- */
-@Deprecated
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
-	private static final long serialVersionUID = 1627268846962918126L;
-
-	private int targetPartition = -1;
-
-	@Override
-	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
-		if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
-			throw new IllegalArgumentException();
-		}
-		
-		this.targetPartition = partitions[parallelInstanceId % partitions.length];
-	}
-
-	@Override
-	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-		if (targetPartition >= 0) {
-			return targetPartition;
-		} else {
-			throw new RuntimeException("The partitioner has not been initialized properly");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index d2eb7af..e47c667 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
  *
@@ -44,9 +46,8 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
  * </pre>
  *
  *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers).
  */
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
@@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
 	@Override
 	public void open(int parallelInstanceId, int parallelInstances) {
-		if (parallelInstanceId < 0 || parallelInstances <= 0) {
-			throw new IllegalArgumentException();
-		}
+		Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
+		Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
+
 		this.parallelInstanceId = parallelInstanceId;
 	}
 	
 	@Override
 	public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
-		if(null == partitions || partitions.length == 0) {
-			throw new IllegalArgumentException();
-		}
+		Preconditions.checkArgument(
+			partitions != null && partitions.length > 0,
+			"Partitions of the target topic is empty.");
 		
 		return partitions[parallelInstanceId % partitions.length];
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 469fd1b..b7b4143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,8 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 /**
- * Delegate for KafkaPartitioner
- * @param <T>
+ * Delegate for the deprecated {@link KafkaPartitioner}.
+ * This should only be used for bridging deprecated partitioning API methods.
+ *
  * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
  */
 @Deprecated

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index e074b9b..b634af7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -20,20 +20,34 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
 import java.io.Serializable;
 
 /**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
+ * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
+ * across partitions of multiple Kafka topics.
  */
 public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+
 	private static final long serialVersionUID = -9086719227828020494L;
 
 	/**
-	 * Initializer for the Partitioner.
-	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * Initializer for the partitioner. This is called once on each parallel sink instance of
+	 * the Flink Kafka producer. This method should be overridden if necessary.
+	 *
+	 * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 */
 	public void open(int parallelInstanceId, int parallelInstances) {
 		// overwrite this method if needed.
 	}
-	
+
+	/**
+	 * Determine the id of the partition that the record should be written to.
+	 *
+	 * @param record the record value
+	 * @param key serialized key of the record
+	 * @param value serialized value of the record
+	 * @param targetTopic target topic for the record
+	 * @param partitions found partitions for the target topic
+	 *
+	 * @return the id of the target partition
+	 */
 	public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 7c82bd1..eebc619 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
- * @deprecated Use {@link FlinkKafkaPartitioner} instead.
+ *
+ * @deprecated This partitioner does not handle partitioning properly in the case of
+ *             multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead.
  */
 @Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
@@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements Serializable {
 	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
 	 * @param parallelInstances the total number of parallel instances
 	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
-	 * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
 	 */
-	@Deprecated
 	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
 		// overwrite this method if needed.
 	}
 
-	/**
-	 *
-	 * @param next
-	 * @param serializedKey
-	 * @param serializedValue
-	 * @param numPartitions
-	 * @return
-	 * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
-	 */
-	@Deprecated
 	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 1f16d8e..6b2cc02 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest {
 		// no bootstrap servers set in props
 		Properties props = new Properties();
 		// should throw IllegalArgumentException
-		new DummyFlinkKafkaProducer<>(props, null);
+		new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 	}
 
 	/**
@@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest {
 		Properties props = new Properties();
 		props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
 		// should set missing key value deserializers
-		new DummyFlinkKafkaProducer<>(props, null);
+		new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
 		assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
@@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest {
 	/**
 	 * Tests that partitions list is determinate and correctly provided to custom partitioner
 	 */
+	@SuppressWarnings("unchecked")
 	@Test
-	public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
-		KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+	public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
+		FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
 
 		RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
 		when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
@@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest {
 		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
 		mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
 
-		final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
-			FakeStandardProducerConfig.get(), mockPartitioner);
+		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
 		producer.setRuntimeContext(mockRuntimeContext);
 
 		final KafkaProducer mockProducer = producer.getMockKafkaProducer();
@@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest {
 		when(mockProducer.metrics()).thenReturn(null);
 
 		producer.open(new Configuration());
+		verify(mockPartitioner, times(1)).open(0, 1);
 
-		// the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
-		int[] correctPartitionList = {0, 1, 2, 3};
-		verify(mockPartitioner).open(0, 1, correctPartitionList);
+		producer.invoke("foobar");
+		verify(mockPartitioner, times(1)).partition(
+			"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
 	}
 
 	/**
@@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test
 	public void testAsyncErrorRethrownOnInvoke() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness =
 			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test
 	public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
 		OneInputStreamOperatorTestHarness<String, Object> testHarness =
 			new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=5000)
 	public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(true);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=10000)
 	public void testAtLeastOnceProducer() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(true);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest {
 	@Test(timeout=5000)
 	public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
 		final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
-			FakeStandardProducerConfig.get(), null);
+			FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 		producer.setFlushOnCheckpoint(false);
 
 		final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest {
 		private boolean isFlushed;
 
 		@SuppressWarnings("unchecked")
-		DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+		DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
 
-			super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+			super(DUMMY_TOPIC, schema, producerConfig, partitioner);
 
 			this.mockProducer = mock(KafkaProducer.class);
 			when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 203d814..ac278fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-		
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
-
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state.get(0);
 		}
 	}
+
+	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+		KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+		public Tuple2WithTopicSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+
+		@Override
+		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+			ByteArrayOutputStream by = new ByteArrayOutputStream();
+			DataOutputView out = new DataOutputViewStreamWrapper(by);
+			try {
+				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+			} catch (IOException e) {
+				throw new RuntimeException("Error" ,e);
+			}
+			return by.toByteArray();
+		}
+
+		@Override
+		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+			return element.f2;
+		}
+	}
 }