You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:14 UTC
[4/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements
to FlinkKafkaPartitioner custom partitioning
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8285048..bcc8328 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,10 +29,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -42,31 +46,50 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public abstract class KafkaProducerTestBase extends KafkaTestBase {
-
/**
- *
+ * This tests verifies that custom partitioning works correctly, with a default topic
+ * and dynamic topic. The number of partitions for each topic is deliberately different.
+ *
+ * Test topology:
+ *
* <pre>
- * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
- * / | \
- * / | \
- * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
- * \ | /
- * \ | /
- * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+ * +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
+ * / | | | |
+ * | | | | ------+--> (sink)
+ * +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+
+ * / |
+ * | |
+ * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+
+ * | | | | |
+ * \ | | | |
+ * +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink)
+ * | | | | |
+ * \ | | | |
+ * +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
* </pre>
*
- * The mapper validates that the values come consistently from the correct Kafka partition.
+ * Each topic has an independent mapper that validates the values come consistently from
+ * the correct Kafka partition of the topic is is responsible of.
*
- * The final sink validates that there are no duplicates and that all partitions are present.
+ * Each topic also has a final sink that validates that there are no duplicates and that all
+ * partitions are present.
*/
public void runCustomPartitioningTest() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
- final String topic = "customPartitioningTestTopic";
- final int parallelism = 3;
-
- createTestTopic(topic, parallelism, 1);
+ final String defaultTopic = "defaultTopic";
+ final int defaultTopicPartitions = 2;
+
+ final String dynamicTopic = "dynamicTopic";
+ final int dynamicTopicPartitions = 3;
+
+ createTestTopic(defaultTopic, defaultTopicPartitions, 1);
+ createTestTopic(dynamicTopic, dynamicTopicPartitions, 1);
+
+ Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2);
+ expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
+ expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
@@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
env.getConfig().disableSysoutLogging();
TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
// ------ producing topology ---------
-
+
// source has DOP 1 to make sure it generates no duplicates
DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
public void cancel() {
running = false;
}
- })
- .setParallelism(1);
+ }).setParallelism(1);
Properties props = new Properties();
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
props.putAll(secureProps);
-
- // sink partitions into
- kafkaServer.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(serSchema),
+
+ // sink partitions into
+ kafkaServer.produceIntoKafka(stream, defaultTopic,
+ // this serialization schema will route between the default topic and dynamic topic
+ new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
props,
- new CustomPartitioner(parallelism)).setParallelism(parallelism);
+ new CustomPartitioner(expectedTopicsToNumPartitions))
+ .setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
// ------ consuming topology ---------
Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.putAll(secureProps);
- FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-
- env.addSource(source).setParallelism(parallelism)
-
- // mapper that validates partitioning and maps to partition
- .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-
- private int ourPartition = -1;
- @Override
- public Integer map(Tuple2<Long, String> value) {
- int partition = value.f0.intValue() % parallelism;
- if (ourPartition != -1) {
- assertEquals("inconsistent partitioning", ourPartition, partition);
- } else {
- ourPartition = partition;
- }
- return partition;
- }
- }).setParallelism(parallelism)
-
- .addSink(new SinkFunction<Integer>() {
-
- private int[] valuesPerPartition = new int[parallelism];
-
- @Override
- public void invoke(Integer value) throws Exception {
- valuesPerPartition[value]++;
-
- boolean missing = false;
- for (int i : valuesPerPartition) {
- if (i < 100) {
- missing = true;
- break;
- }
- }
- if (!missing) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
-
+
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource =
+ kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource =
+ kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps);
+
+ env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
+ .map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
+ .addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
+
+ env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
+ .map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
+ .addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
+
tryExecute(env, "custom partitioning test");
- deleteTestTopic(topic);
-
+ deleteTestTopic(defaultTopic);
+ deleteTestTopic(dynamicTopic);
+
LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
}
catch (Exception e) {
@@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
- private final int expectedPartitions;
+ private final Map<String, Integer> expectedTopicsToNumPartitions;
- public CustomPartitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
+ public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
+ this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
}
-
@Override
public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
- assertEquals(expectedPartitions, partitions.length);
+ assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length);
+
+ return (int) (next.f0 % partitions.length);
+ }
+ }
+
+ /**
+ * A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics.
+ */
+ public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
+
+ private final String defaultTopic;
+ private final String dynamicTopic;
+
+ public CustomKeyedSerializationSchemaWrapper(
+ SerializationSchema<Tuple2<Long, String>> serializationSchema,
+ String defaultTopic,
+ String dynamicTopic) {
+
+ super(serializationSchema);
+
+ this.defaultTopic = Preconditions.checkNotNull(defaultTopic);
+ this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic);
+ }
- return (int) (next.f0 % expectedPartitions);
+ @Override
+ public String getTargetTopic(Tuple2<Long, String> element) {
+ return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic;
+ }
+ }
+
+ /**
+ * Mapper that validates partitioning and maps to partition.
+ */
+ public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
+
+ private final int numPartitions;
+
+ private int ourPartition = -1;
+
+ public PartitionValidatingMapper(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ @Override
+ public Integer map(Tuple2<Long, String> value) throws Exception {
+ int partition = value.f0.intValue() % numPartitions;
+ if (ourPartition != -1) {
+ assertEquals("inconsistent partitioning", ourPartition, partition);
+ } else {
+ ourPartition = partition;
+ }
+ return partition;
+ }
+ }
+
+ /**
+ * Sink that validates records received from each partition and checks that there are no duplicates.
+ */
+ public static class PartitionValidatingSink implements SinkFunction<Integer> {
+ private final int[] valuesPerPartition;
+
+ public PartitionValidatingSink(int numPartitions) {
+ this.valuesPerPartition = new int[numPartitions];
+ }
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ valuesPerPartition[value]++;
+
+ boolean missing = false;
+ for (int i : valuesPerPartition) {
+ if (i < 100) {
+ missing = true;
+ break;
+ }
+ }
+ if (!missing) {
+ throw new SuccessException();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 0fdc82e..d4fe9cc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,14 +17,12 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import java.io.Serializable;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.table.api.Types;
@@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
- private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
- private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
+ private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase {
}
@Test
- @SuppressWarnings("unchecked")
- public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
- DataStream dataStream = mock(DataStream.class);
-
- KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
- kafkaTableSink.emitDataStream(dataStream);
-
- verify(dataStream).addSink(eq(PRODUCER));
-
- verify(kafkaTableSink).createKafkaProducer(
- eq(TOPIC),
- eq(PROPERTIES),
- any(getSerializationSchema().getClass()),
- eq(FLINK_PARTITIONER));
- }
-
- @Test
public void testConfiguration() {
KafkaTableSink kafkaTableSink = createTableSink();
KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
- @Test
- public void testConfigurationWithFlinkPartitioner() {
- KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
- KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
- assertNotSame(kafkaTableSink, newKafkaTableSink);
-
- assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
- assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
- assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
- }
-
protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
- KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
-
- protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
- Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+ FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
protected abstract SerializationSchema<Row> getSerializationSchema();
@@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase {
return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
}
- private KafkaTableSink createTableSinkWithFlinkPartitioner() {
- return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
- }
-
private static Properties createSinkProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:12345");
return properties;
}
- private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
- @Override
- public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return 0;
- }
- }
-
- private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+ private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
@Override
public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 4, partitions);
- Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
-
- part.open(2, 4, partitions);
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
-
- part.open(3, 4, partitions);
- Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.open(0, 2, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 2, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
- int[] partitions = new int[]{0,1};
-
- part.open(0, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 3, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-
- part.open(2, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
deleted file mode 100644
index c6be71c..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
- */
-public class TestFlinkKafkaDelegatePartitioner {
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 4);
- Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
- part.open(2, 4);
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
- part.open(3, 4);
- Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.setPartitions(partitions);
-
- part.open(0, 2);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 2);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
- int[] partitions = new int[]{0,1};
- part.setPartitions(partitions);
-
- part.open(0, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 3);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
- part.open(2, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 43e1aa7..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
- */
-@Deprecated
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int expectedPartitions;
-
- public Tuple2Partitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (numPartitions != expectedPartitions) {
- throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
- }
-
- return next.f0;
- }
-}