You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:12 UTC
[2/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner
API that correctly handles multiple Kafka sink topics
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2adb5ec..203d814 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,7 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
/**
* Test producing and consuming into multiple topics
- * @throws java.lang.Exception
+ * @throws Exception
*/
- public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
+ public void runProduceConsumeMultipleTopics() throws Exception {
final int NUM_TOPICS = 5;
final int NUM_ELEMENTS = 20;
@@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
+
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+ kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
.setParallelism(parallelism);
try {
@@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state.get(0);
}
}
-
- private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
- KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
- private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
- public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
- }
-
- @Override
- public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Tuple2<Integer, Integer> t2 = ts.deserialize(in);
- return new Tuple3<>(t2.f0, t2.f1, topic);
- }
-
- @Override
- public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
- }
-
- @Override
- public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
- ByteArrayOutputStream by = new ByteArrayOutputStream();
- DataOutputView out = new DataOutputViewStreamWrapper(by);
- try {
- ts.serialize(new Tuple2<>(element.f0, element.f1), out);
- } catch (IOException e) {
- throw new RuntimeException("Error" ,e);
- }
- return by.toByteArray();
- }
-
- @Override
- public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
- return element.f2;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 6f61392..8285048 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
-
import java.io.Serializable;
import java.util.Properties;
@@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
// ------------------------------------------------------------------------
- public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+ public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
private final int expectedPartitions;
@@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
@Override
- public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- assertEquals(expectedPartitions, numPartitions);
+ public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+ assertEquals(expectedPartitions, partitions.length);
- return (int) (next.f0 % numPartitions);
+ return (int) (next.f0 % expectedPartitions);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e0e8f84..0fdc82e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,25 +17,26 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import java.io.Serializable;
+import java.util.Properties;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
import org.junit.Test;
-import java.io.Serializable;
-import java.util.Properties;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase {
protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+ private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
+ DataStream dataStream = mock(DataStream.class);
+
+ KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
+ kafkaTableSink.emitDataStream(dataStream);
+
+ verify(dataStream).addSink(eq(PRODUCER));
+
+ verify(kafkaTableSink).createKafkaProducer(
+ eq(TOPIC),
+ eq(PROPERTIES),
+ any(getSerializationSchema().getClass()),
+ eq(FLINK_PARTITIONER));
+ }
+
+ @Test
public void testConfiguration() {
KafkaTableSink kafkaTableSink = createTableSink();
KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
+ @Test
+ public void testConfigurationWithFlinkPartitioner() {
+ KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
+ KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+ assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+ assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+ assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+ assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
+ }
+
protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+ protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
+ Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
protected abstract SerializationSchema<Row> getSerializationSchema();
private KafkaTableSink createTableSink() {
return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
}
+ private KafkaTableSink createTableSinkWithFlinkPartitioner() {
+ return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
+ }
+
private static Properties createSinkProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:12345");
@@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase {
return 0;
}
}
+
+ private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+ @Override
+ public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 9a7c96a..311a1a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,20 +17,20 @@
package org.apache.flink.streaming.connectors.kafka;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
import kafka.server.KafkaServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
/**
* Abstract class providing a Kafka test environment
*/
@@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment {
public abstract <T> StreamSink<T> getProducerSink(String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
+ FlinkKafkaPartitioner<T> partitioner);
public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
+ FlinkKafkaPartitioner<T> partitioner);
// -- offset handlers
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
new file mode 100644
index 0000000..fa84199
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkFixedPartitioner {
+
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 4);
+ Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+ part.open(2, 4);
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+ part.open(3, 4);
+ Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 2);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+ int[] partitions = new int[]{0,1};
+
+ part.open(0, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 3);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+ part.open(2, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..c6be71c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
+ */
+public class TestFlinkKafkaDelegatePartitioner {
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 4);
+ Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+ part.open(2, 4);
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+ part.open(3, 4);
+ Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.setPartitions(partitions);
+
+ part.open(0, 2);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 2);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+ int[] partitions = new int[]{0,1};
+ part.setPartitions(partitions);
+
+ part.open(0, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 3);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+ part.open(2, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c383eb5..c0fb836 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
@SuppressWarnings("serial")
public class DataGenerators {
@@ -107,10 +107,10 @@ public class DataGenerators {
testServer.produceIntoKafka(stream, topic,
new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
props,
- new KafkaPartitioner<Integer>() {
+ new FlinkKafkaPartitioner<Integer>() {
@Override
- public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return next % numPartitions;
+ public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+ return next % partitions.length;
}
});
@@ -149,7 +149,7 @@ public class DataGenerators {
topic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
producerProperties,
- new FixedPartitioner<String>());
+ new FlinkFixedPartitioner<String>());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(sink);
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
new file mode 100644
index 0000000..e7fff52
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = -3589898230375281549L;
+
+ private final int expectedPartitions;
+
+ public Tuple2FlinkPartitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+ @Override
+ public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ if (partitions.length != expectedPartitions) {
+ throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+ }
+
+ return next.f0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index c9e9ac1..43e1aa7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -18,15 +18,16 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import java.io.Serializable;
+
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import java.io.Serializable;
-
/**
* Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
+ * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
*/
+@Deprecated
public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -45,4 +46,4 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>
return next.f0;
}
-}
\ No newline at end of file
+}