You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:12 UTC

[2/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2adb5ec..203d814 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,7 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
 import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	/**
 	 * Test producing and consuming into multiple topics
-	 * @throws java.lang.Exception
+	 * @throws Exception
 	 */
-	public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
+	public void runProduceConsumeMultipleTopics() throws Exception {
 		final int NUM_TOPICS = 5;
 		final int NUM_ELEMENTS = 20;
 
@@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
+	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+			KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+		
+		public Tuple2WithTopicSchema(ExecutionConfig ec) {
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+		}
+
+		@Override
+		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+			return new Tuple3<>(t2.f0, t2.f1, topic);
+		}
+
+		@Override
+		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+		}
+
+		@Override
+		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+			return null;
+		}
+
+		@Override
+		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+			ByteArrayOutputStream by = new ByteArrayOutputStream();
+			DataOutputView out = new DataOutputViewStreamWrapper(by);
+			try {
+				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+			} catch (IOException e) {
+				throw new RuntimeException("Error" ,e);
+			}
+			return by.toByteArray();
+		}
+
+		@Override
+		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+			return element.f2;
+		}
+	}
+
 	/**
 	 * Test Flink's Kafka integration also with very big records (30MB)
 	 * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			producerProperties.setProperty("retries", "0");
 			producerProperties.putAll(secureProps);
 			
-			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+			kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
 					.setParallelism(parallelism);
 
 			try {
@@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 			this.numElementsTotal = state.get(0);
 		}
 	}
-
-	private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-		KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
-		public Tuple2WithTopicSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-		}
-
-		@Override
-		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
-			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
-			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-			return new Tuple3<>(t2.f0, t2.f1, topic);
-		}
-
-		@Override
-		public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
-			return false;
-		}
-
-		@Override
-		public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
-			return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
-		}
-
-		@Override
-		public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
-			return null;
-		}
-
-		@Override
-		public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
-			ByteArrayOutputStream by = new ByteArrayOutputStream();
-			DataOutputView out = new DataOutputViewStreamWrapper(by);
-			try {
-				ts.serialize(new Tuple2<>(element.f0, element.f1), out);
-			} catch (IOException e) {
-				throw new RuntimeException("Error" ,e);
-			}
-			return by.toByteArray();
-		}
-
-		@Override
-		public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
-			return element.f2;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 6f61392..8285048 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 
-
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	// ------------------------------------------------------------------------
 
-	public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
 		private final int expectedPartitions;
 
@@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 
 		@Override
-		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			assertEquals(expectedPartitions, numPartitions);
+		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+			assertEquals(expectedPartitions, partitions.length);
 
-			return (int) (next.f0 % numPartitions);
+			return (int) (next.f0 % expectedPartitions);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e0e8f84..0fdc82e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,25 +17,26 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.io.Serializable;
+import java.util.Properties;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.util.Properties;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase {
 	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
 	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+	private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
 	@SuppressWarnings("unchecked")
 	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
+	public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
+		DataStream dataStream = mock(DataStream.class);
+
+		KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
+		kafkaTableSink.emitDataStream(dataStream);
+
+		verify(dataStream).addSink(eq(PRODUCER));
+
+		verify(kafkaTableSink).createKafkaProducer(
+			eq(TOPIC),
+			eq(PROPERTIES),
+			any(getSerializationSchema().getClass()),
+			eq(FLINK_PARTITIONER));
+	}
+
+	@Test
 	public void testConfiguration() {
 		KafkaTableSink kafkaTableSink = createTableSink();
 		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase {
 		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
+	@Test
+	public void testConfigurationWithFlinkPartitioner() {
+		KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
+		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+		assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
+	}
+
 	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
 			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
+	protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
+			Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
 	protected abstract SerializationSchema<Row> getSerializationSchema();
 
 	private KafkaTableSink createTableSink() {
 		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
 	}
 
+	private KafkaTableSink createTableSinkWithFlinkPartitioner() {
+		return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
+	}
+
 	private static Properties createSinkProperties() {
 		Properties properties = new Properties();
 		properties.setProperty("bootstrap.servers", "localhost:12345");
@@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase {
 			return 0;
 		}
 	}
+
+	private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+		@Override
+		public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+			return 0;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 9a7c96a..311a1a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,20 +17,20 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
 /**
  * Abstract class providing a Kafka test environment
  */
@@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract <T> StreamSink<T> getProducerSink(String topic,
 			KeyedSerializationSchema<T> serSchema, Properties props,
-			KafkaPartitioner<T> partitioner);
+			FlinkKafkaPartitioner<T> partitioner);
 
 	public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
 														KeyedSerializationSchema<T> serSchema, Properties props,
-														KafkaPartitioner<T> partitioner);
+														FlinkKafkaPartitioner<T> partitioner);
 
 	// -- offset handlers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
new file mode 100644
index 0000000..fa84199
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkFixedPartitioner {
+
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 4);
+		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+		part.open(2, 4);
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+		part.open(3, 4);
+		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.open(0, 2);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 2);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+		int[] partitions = new int[]{0,1};
+
+		part.open(0, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 3);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+		part.open(2, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..c6be71c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
+ */
+public class TestFlinkKafkaDelegatePartitioner {
+
+	/**
+	 * <pre>
+	 *   		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2   --------------/
+	 * 			3   -------------/
+	 * 			4	------------/
+	 * </pre>
+	 */
+	@Test
+	public void testMoreFlinkThanBrokers() {
+		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+		int[] partitions = new int[]{0};
+
+		part.open(0, 4);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 4);
+		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+		part.open(2, 4);
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+		part.open(3, 4);
+		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+	}
+
+	/**
+	 *
+	 * <pre>
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	---------------->	1
+	 * 			2	---------------->	2
+	 * 									3
+	 * 									4
+	 * 									5
+	 *
+	 * </pre>
+	 */
+	@Test
+	public void testFewerPartitions() {
+		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+
+		int[] partitions = new int[]{0, 1, 2, 3, 4};
+		part.setPartitions(partitions);
+		
+		part.open(0, 2);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 2);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+	}
+
+	/*
+	 * 		Flink Sinks:		Kafka Partitions
+	 * 			1	------------>--->	1
+	 * 			2	-----------/----> 	2
+	 * 			3	----------/
+	 */
+	@Test
+	public void testMixedCase() {
+		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+		int[] partitions = new int[]{0,1};
+		part.setPartitions(partitions);
+
+		part.open(0, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+		part.open(1, 3);
+		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+		part.open(2, 3);
+		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c383eb5..c0fb836 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
 @SuppressWarnings("serial")
 public class DataGenerators {
 
@@ -107,10 +107,10 @@ public class DataGenerators {
 		testServer.produceIntoKafka(stream, topic,
 				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
 				props,
-				new KafkaPartitioner<Integer>() {
+				new FlinkKafkaPartitioner<Integer>() {
 					@Override
-					public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-						return next % numPartitions;
+					public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+						return next % partitions.length;
 					}
 				});
 
@@ -149,7 +149,7 @@ public class DataGenerators {
 						topic,
 						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
 						producerProperties,
-						new FixedPartitioner<String>());
+						new FlinkFixedPartitioner<String>());
 
 				OneInputStreamOperatorTestHarness<String, Object> testHarness =
 						new OneInputStreamOperatorTestHarness<>(sink);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
new file mode 100644
index 0000000..e7fff52
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer, Integer>> {
+	private static final long serialVersionUID = -3589898230375281549L;
+
+	private final int expectedPartitions;
+
+	public Tuple2FlinkPartitioner(int expectedPartitions) {
+		this.expectedPartitions = expectedPartitions;
+	}
+	
+	@Override
+	public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+		if (partitions.length != expectedPartitions) {
+			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+		}
+		
+		return next.f0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index c9e9ac1..43e1aa7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.io.Serializable;
+
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 
-import java.io.Serializable;
-
 /**
  * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
+ * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
  */
+@Deprecated
 public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
@@ -45,4 +46,4 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>
 
 		return next.f0;
 	}
-}
\ No newline at end of file
+}