You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:14 UTC

[4/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8285048..bcc8328 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,10 +29,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -42,31 +46,50 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
-
 	/**
-	 * 
+	 * This tests verifies that custom partitioning works correctly, with a default topic
+	 * and dynamic topic. The number of partitions for each topic is deliberately different.
+	 *
+	 * Test topology:
+	 *
 	 * <pre>
-	 *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
-	 *            /                  |                                       \
-	 *           /                   |                                        \
-	 * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
-	 *           \                   |                                        /
-	 *            \                  |                                       /
-	 *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+	 *             +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
+	 *            /                  |                             |          |        |
+	 *           |                   |                             |          |  ------+--> (sink)
+	 *             +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+
+	 *            /                  |
+	 *           |                   |
+	 * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+
+	 *           |                   |                             |          |        |
+	 *            \                  |                             |          |        |
+	 *             +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink)
+	 *           |                   |                             |          |        |
+	 *            \                  |                             |          |        |
+	 *             +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
 	 * </pre>
 	 * 
-	 * The mapper validates that the values come consistently from the correct Kafka partition.
+	 * Each topic has an independent mapper that validates the values come consistently from
+	 * the correct Kafka partition of the topic is is responsible of.
 	 * 
-	 * The final sink validates that there are no duplicates and that all partitions are present.
+	 * Each topic also has a final sink that validates that there are no duplicates and that all
+	 * partitions are present.
 	 */
 	public void runCustomPartitioningTest() {
 		try {
 			LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
 
-			final String topic = "customPartitioningTestTopic";
-			final int parallelism = 3;
-			
-			createTestTopic(topic, parallelism, 1);
+			final String defaultTopic = "defaultTopic";
+			final int defaultTopicPartitions = 2;
+
+			final String dynamicTopic = "dynamicTopic";
+			final int dynamicTopicPartitions = 3;
+
+			createTestTopic(defaultTopic, defaultTopicPartitions, 1);
+			createTestTopic(dynamicTopic, dynamicTopicPartitions, 1);
+
+			Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2);
+			expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
+			expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
 
 			TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
 
@@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 			env.getConfig().disableSysoutLogging();
 
 			TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+				new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
 			TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
-					new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+				new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
 			// ------ producing topology ---------
-			
+
 			// source has DOP 1 to make sure it generates no duplicates
 			DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
 
@@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 				public void cancel() {
 					running = false;
 				}
-			})
-			.setParallelism(1);
+			}).setParallelism(1);
 
 			Properties props = new Properties();
 			props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
 			props.putAll(secureProps);
-			
-			// sink partitions into 
-			kafkaServer.produceIntoKafka(stream, topic,
-					new KeyedSerializationSchemaWrapper<>(serSchema),
+
+			// sink partitions into
+			kafkaServer.produceIntoKafka(stream, defaultTopic,
+					// this serialization schema will route between the default topic and dynamic topic
+					new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
 					props,
-					new CustomPartitioner(parallelism)).setParallelism(parallelism);
+					new CustomPartitioner(expectedTopicsToNumPartitions))
+				.setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
 
 			// ------ consuming topology ---------
 
 			Properties consumerProps = new Properties();
 			consumerProps.putAll(standardProps);
 			consumerProps.putAll(secureProps);
-			FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-			
-			env.addSource(source).setParallelism(parallelism)
-
-					// mapper that validates partitioning and maps to partition
-					.map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-						
-						private int ourPartition = -1;
-						@Override
-						public Integer map(Tuple2<Long, String> value) {
-							int partition = value.f0.intValue() % parallelism;
-							if (ourPartition != -1) {
-								assertEquals("inconsistent partitioning", ourPartition, partition);
-							} else {
-								ourPartition = partition;
-							}
-							return partition;
-						}
-					}).setParallelism(parallelism)
-					
-					.addSink(new SinkFunction<Integer>() {
-						
-						private int[] valuesPerPartition = new int[parallelism];
-						
-						@Override
-						public void invoke(Integer value) throws Exception {
-							valuesPerPartition[value]++;
-							
-							boolean missing = false;
-							for (int i : valuesPerPartition) {
-								if (i < 100) {
-									missing = true;
-									break;
-								}
-							}
-							if (!missing) {
-								throw new SuccessException();
-							}
-						}
-					}).setParallelism(1);
-			
+
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource =
+					kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps);
+			FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource =
+					kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps);
+
+			env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
+				.map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
+				.addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
+
+			env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
+				.map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
+				.addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
+
 			tryExecute(env, "custom partitioning test");
 
-			deleteTestTopic(topic);
-			
+			deleteTestTopic(defaultTopic);
+			deleteTestTopic(dynamicTopic);
+
 			LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
 		}
 		catch (Exception e) {
@@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
 	public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
-		private final int expectedPartitions;
+		private final Map<String, Integer> expectedTopicsToNumPartitions;
 
-		public CustomPartitioner(int expectedPartitions) {
-			this.expectedPartitions = expectedPartitions;
+		public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
+			this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
 		}
 
-
 		@Override
 		public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
-			assertEquals(expectedPartitions, partitions.length);
+			assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length);
+
+			return (int) (next.f0 % partitions.length);
+		}
+	}
+
+	/**
+	 * A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics.
+	 */
+	public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
+
+		private final String defaultTopic;
+		private final String dynamicTopic;
+
+		public CustomKeyedSerializationSchemaWrapper(
+				SerializationSchema<Tuple2<Long, String>> serializationSchema,
+				String defaultTopic,
+				String dynamicTopic) {
+
+			super(serializationSchema);
+
+			this.defaultTopic = Preconditions.checkNotNull(defaultTopic);
+			this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic);
+		}
 
-			return (int) (next.f0 % expectedPartitions);
+		@Override
+		public String getTargetTopic(Tuple2<Long, String> element) {
+			return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic;
+		}
+	}
+
+	/**
+	 * Mapper that validates partitioning and maps to partition.
+	 */
+	public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
+
+		private final int numPartitions;
+
+		private int ourPartition = -1;
+
+		public PartitionValidatingMapper(int numPartitions) {
+			this.numPartitions = numPartitions;
+		}
+
+		@Override
+		public Integer map(Tuple2<Long, String> value) throws Exception {
+			int partition = value.f0.intValue() % numPartitions;
+			if (ourPartition != -1) {
+				assertEquals("inconsistent partitioning", ourPartition, partition);
+			} else {
+				ourPartition = partition;
+			}
+			return partition;
+		}
+	}
+
+	/**
+	 * Sink that validates records received from each partition and checks that there are no duplicates.
+	 */
+	public static class PartitionValidatingSink implements SinkFunction<Integer> {
+		private final int[] valuesPerPartition;
+
+		public PartitionValidatingSink(int numPartitions) {
+			this.valuesPerPartition = new int[numPartitions];
+		}
+
+		@Override
+		public void invoke(Integer value) throws Exception {
+			valuesPerPartition[value]++;
+
+			boolean missing = false;
+			for (int i : valuesPerPartition) {
+				if (i < 100) {
+					missing = true;
+					break;
+				}
+			}
+			if (!missing) {
+				throw new SuccessException();
+			}
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 0fdc82e..d4fe9cc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,14 +17,12 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.Serializable;
 import java.util.Properties;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.api.Types;
@@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase {
 	private static final String TOPIC = "testTopic";
 	protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
 	private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
-	private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
-	private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
+	private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
 	private static final Properties PROPERTIES = createSinkProperties();
 	@SuppressWarnings("unchecked")
 	private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
-	public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
-		DataStream dataStream = mock(DataStream.class);
-
-		KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
-		kafkaTableSink.emitDataStream(dataStream);
-
-		verify(dataStream).addSink(eq(PRODUCER));
-
-		verify(kafkaTableSink).createKafkaProducer(
-			eq(TOPIC),
-			eq(PROPERTIES),
-			any(getSerializationSchema().getClass()),
-			eq(FLINK_PARTITIONER));
-	}
-
-	@Test
 	public void testConfiguration() {
 		KafkaTableSink kafkaTableSink = createTableSink();
 		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase {
 		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
 	}
 
-	@Test
-	public void testConfigurationWithFlinkPartitioner() {
-		KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
-		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
-		assertNotSame(kafkaTableSink, newKafkaTableSink);
-
-		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
-		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
-		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
-	}
-
 	protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
-			KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
-
-	protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
-			Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+			FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
 
 	protected abstract SerializationSchema<Row> getSerializationSchema();
 
@@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase {
 		return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
 	}
 
-	private KafkaTableSink createTableSinkWithFlinkPartitioner() {
-		return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
-	}
-
 	private static Properties createSinkProperties() {
 		Properties properties = new Properties();
 		properties.setProperty("bootstrap.servers", "localhost:12345");
 		return properties;
 	}
 
-	private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
-		@Override
-		public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-			return 0;
-		}
-	}
-
-	private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+	private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
 		@Override
 		public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 			return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
-
-		part.open(2, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
-		Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
-
-		part.open(3, 4, partitions);
-		Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.open(0, 2, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 2, partitions);
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FixedPartitioner<String> part = new FixedPartitioner<>();
-		int[] partitions = new int[]{0,1};
-
-		part.open(0, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-		part.open(1, 3, partitions);
-		Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-
-		part.open(2, 3, partitions);
-		Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
deleted file mode 100644
index c6be71c..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
- */
-public class TestFlinkKafkaDelegatePartitioner {
-
-	/**
-	 * <pre>
-	 *   		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2   --------------/
-	 * 			3   -------------/
-	 * 			4	------------/
-	 * </pre>
-	 */
-	@Test
-	public void testMoreFlinkThanBrokers() {
-		FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
-		int[] partitions = new int[]{0};
-
-		part.open(0, 4);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 4);
-		Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
-		part.open(2, 4);
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
-		part.open(3, 4);
-		Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
-	}
-
-	/**
-	 *
-	 * <pre>
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	---------------->	1
-	 * 			2	---------------->	2
-	 * 									3
-	 * 									4
-	 * 									5
-	 *
-	 * </pre>
-	 */
-	@Test
-	public void testFewerPartitions() {
-		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-
-		int[] partitions = new int[]{0, 1, 2, 3, 4};
-		part.setPartitions(partitions);
-		
-		part.open(0, 2);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 2);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-	}
-
-	/*
-	 * 		Flink Sinks:		Kafka Partitions
-	 * 			1	------------>--->	1
-	 * 			2	-----------/----> 	2
-	 * 			3	----------/
-	 */
-	@Test
-	public void testMixedCase() {
-		FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-		int[] partitions = new int[]{0,1};
-		part.setPartitions(partitions);
-
-		part.open(0, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-		part.open(1, 3);
-		Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
-		part.open(2, 3);
-		Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 43e1aa7..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
- */
-@Deprecated
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private final int expectedPartitions;
-
-	public Tuple2Partitioner(int expectedPartitions) {
-		this.expectedPartitions = expectedPartitions;
-	}
-
-	@Override
-	public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-		if (numPartitions != expectedPartitions) {
-			throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
-		}
-
-		return next.f0;
-	}
-}