You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:42:41 UTC
[1/6] flink git commit: [FLINK-6608] [security,
config] Relax Kerberos login contexts parsing
Repository: flink
Updated Branches:
refs/heads/release-1.3 94111f985 -> 0963718ac
[FLINK-6608] [security, config] Relax Kerberos login contexts parsing
This closes #3928.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a43badc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a43badc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a43badc
Branch: refs/heads/release-1.3
Commit: 1a43badcd2e7893819f6aba9e7cd7688176efc58
Parents: 94111f9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 17 16:00:37 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:41:48 2017 +0800
----------------------------------------------------------------------
.../flink/runtime/security/SecurityUtils.java | 8 ++-
.../runtime/security/SecurityUtilsTest.java | 62 +++++++++++++++++++-
2 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a43badc/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index 7a09c32..b874009 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -230,10 +230,14 @@ public class SecurityUtils {
}
private static List<String> parseList(String value) {
- if(value == null) {
+ if(value == null || value.isEmpty()) {
return Collections.emptyList();
}
- return Arrays.asList(value.split(","));
+
+ return Arrays.asList(value
+ .trim()
+ .replaceAll("(\\s*,+\\s*)+", ",")
+ .split(","));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a43badc/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index c5624f4..3e3808b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -18,13 +18,19 @@
package org.apache.flink.runtime.security;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.modules.SecurityModule;
import org.junit.AfterClass;
import org.junit.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* Tests for the {@link SecurityUtils}.
@@ -78,4 +84,58 @@ public class SecurityUtilsTest {
SecurityUtils.uninstall();
assertEquals(NoOpSecurityContext.class, SecurityUtils.getInstalledContext().getClass());
}
+
+ @Test
+ public void testKerberosLoginContextParsing() {
+
+ List<String> expectedLoginContexts = Arrays.asList("Foo bar", "Client");
+
+ Configuration testFlinkConf;
+ SecurityUtils.SecurityConfiguration testSecurityConf;
+
+ // ------- no whitespaces
+
+ testFlinkConf = new Configuration();
+ testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,Client");
+ testSecurityConf = new SecurityUtils.SecurityConfiguration(
+ testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
+ assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+ // ------- with whitespaces surrounding comma
+
+ testFlinkConf = new Configuration();
+ testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar , Client");
+ testSecurityConf = new SecurityUtils.SecurityConfiguration(
+ testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
+ assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+ // ------- leading / trailing whitespaces at start and end of list
+
+ testFlinkConf = new Configuration();
+ testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, " Foo bar , Client ");
+ testSecurityConf = new SecurityUtils.SecurityConfiguration(
+ testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
+ assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+ // ------- empty entries
+
+ testFlinkConf = new Configuration();
+ testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar,,Client");
+ testSecurityConf = new SecurityUtils.SecurityConfiguration(
+ testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
+ assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+
+ // ------- empty trailing String entries with whitespaces
+
+ testFlinkConf = new Configuration();
+ testFlinkConf.setString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, "Foo bar, ,, Client,");
+ testSecurityConf = new SecurityUtils.SecurityConfiguration(
+ testFlinkConf, new org.apache.hadoop.conf.Configuration(),
+ Collections.singletonList(TestSecurityModule.class));
+ assertEquals(expectedLoginContexts, testSecurityConf.getLoginContextNames());
+ }
}
[2/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner
API that correctly handles multiple Kafka sink topics
Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2adb5ec..203d814 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,7 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
/**
* Test producing and consuming into multiple topics
- * @throws java.lang.Exception
+ * @throws Exception
*/
- public void runProduceConsumeMultipleTopics() throws java.lang.Exception {
+ public void runProduceConsumeMultipleTopics() throws Exception {
final int NUM_TOPICS = 5;
final int NUM_ELEMENTS = 20;
@@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
+
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
producerProperties.setProperty("retries", "0");
producerProperties.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+ kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
.setParallelism(parallelism);
try {
@@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state.get(0);
}
}
-
- private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
- KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
- private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
- public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
- }
-
- @Override
- public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Tuple2<Integer, Integer> t2 = ts.deserialize(in);
- return new Tuple3<>(t2.f0, t2.f1, topic);
- }
-
- @Override
- public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
- }
-
- @Override
- public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
- ByteArrayOutputStream by = new ByteArrayOutputStream();
- DataOutputView out = new DataOutputViewStreamWrapper(by);
- try {
- ts.serialize(new Tuple2<>(element.f0, element.f1), out);
- } catch (IOException e) {
- throw new RuntimeException("Error" ,e);
- }
- return by.toByteArray();
- }
-
- @Override
- public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
- return element.f2;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 6f61392..8285048 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
-
import java.io.Serializable;
import java.util.Properties;
@@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
// ------------------------------------------------------------------------
- public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+ public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
private final int expectedPartitions;
@@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
@Override
- public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- assertEquals(expectedPartitions, numPartitions);
+ public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+ assertEquals(expectedPartitions, partitions.length);
- return (int) (next.f0 % numPartitions);
+ return (int) (next.f0 % expectedPartitions);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e0e8f84..0fdc82e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,25 +17,26 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import java.io.Serializable;
+import java.util.Properties;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
import org.junit.Test;
-import java.io.Serializable;
-import java.util.Properties;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase {
protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
+ private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
+ DataStream dataStream = mock(DataStream.class);
+
+ KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
+ kafkaTableSink.emitDataStream(dataStream);
+
+ verify(dataStream).addSink(eq(PRODUCER));
+
+ verify(kafkaTableSink).createKafkaProducer(
+ eq(TOPIC),
+ eq(PROPERTIES),
+ any(getSerializationSchema().getClass()),
+ eq(FLINK_PARTITIONER));
+ }
+
+ @Test
public void testConfiguration() {
KafkaTableSink kafkaTableSink = createTableSink();
KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
+ @Test
+ public void testConfigurationWithFlinkPartitioner() {
+ KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
+ KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+ assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+ assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
+ assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
+ assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
+ }
+
protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+ protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
+ Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
protected abstract SerializationSchema<Row> getSerializationSchema();
private KafkaTableSink createTableSink() {
return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
}
+ private KafkaTableSink createTableSinkWithFlinkPartitioner() {
+ return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
+ }
+
private static Properties createSinkProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:12345");
@@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase {
return 0;
}
}
+
+ private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+ @Override
+ public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 9a7c96a..311a1a4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,20 +17,20 @@
package org.apache.flink.streaming.connectors.kafka;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
import kafka.server.KafkaServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
/**
* Abstract class providing a Kafka test environment
*/
@@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment {
public abstract <T> StreamSink<T> getProducerSink(String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
+ FlinkKafkaPartitioner<T> partitioner);
public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
KeyedSerializationSchema<T> serSchema, Properties props,
- KafkaPartitioner<T> partitioner);
+ FlinkKafkaPartitioner<T> partitioner);
// -- offset handlers
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
new file mode 100644
index 0000000..fa84199
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkFixedPartitioner {
+
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 4);
+ Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+ part.open(2, 4);
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+ part.open(3, 4);
+ Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.open(0, 2);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 2);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+ int[] partitions = new int[]{0,1};
+
+ part.open(0, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 3);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+ part.open(2, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..c6be71c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
+ */
+public class TestFlinkKafkaDelegatePartitioner {
+
+ /**
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ */
+ @Test
+ public void testMoreFlinkThanBrokers() {
+ FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
+
+ int[] partitions = new int[]{0};
+
+ part.open(0, 4);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 4);
+ Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
+
+ part.open(2, 4);
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
+
+ part.open(3, 4);
+ Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
+ }
+
+ /**
+ *
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ *
+ * </pre>
+ */
+ @Test
+ public void testFewerPartitions() {
+ FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+
+ int[] partitions = new int[]{0, 1, 2, 3, 4};
+ part.setPartitions(partitions);
+
+ part.open(0, 2);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 2);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+ }
+
+ /*
+ * Flink Sinks: Kafka Partitions
+ * 1 ------------>---> 1
+ * 2 -----------/----> 2
+ * 3 ----------/
+ */
+ @Test
+ public void testMixedCase() {
+ FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+ int[] partitions = new int[]{0,1};
+ part.setPartitions(partitions);
+
+ part.open(0, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ part.open(1, 3);
+ Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
+
+ part.open(2, 3);
+ Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c383eb5..c0fb836 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
@SuppressWarnings("serial")
public class DataGenerators {
@@ -107,10 +107,10 @@ public class DataGenerators {
testServer.produceIntoKafka(stream, topic,
new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
props,
- new KafkaPartitioner<Integer>() {
+ new FlinkKafkaPartitioner<Integer>() {
@Override
- public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return next % numPartitions;
+ public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+ return next % partitions.length;
}
});
@@ -149,7 +149,7 @@ public class DataGenerators {
topic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
producerProperties,
- new FixedPartitioner<String>());
+ new FlinkFixedPartitioner<String>());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(sink);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
new file mode 100644
index 0000000..e7fff52
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = -3589898230375281549L;
+
+ private final int expectedPartitions;
+
+ public Tuple2FlinkPartitioner(int expectedPartitions) {
+ this.expectedPartitions = expectedPartitions;
+ }
+
+ @Override
+ public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ if (partitions.length != expectedPartitions) {
+ throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
+ }
+
+ return next.f0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index c9e9ac1..43e1aa7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -18,15 +18,16 @@
package org.apache.flink.streaming.connectors.kafka.testutils;
+import java.io.Serializable;
+
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import java.io.Serializable;
-
/**
* Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
+ * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
*/
+@Deprecated
public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -45,4 +46,4 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>
return next.f0;
}
-}
\ No newline at end of file
+}
[3/6] flink git commit: [FLINK-6288] [kafka] New custom partitioner
API that correctly handles multiple Kafka sink topics
Posted by tz...@apache.org.
[FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple Kafka sink topics
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58c4eed5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58c4eed5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58c4eed5
Branch: refs/heads/release-1.3
Commit: 58c4eed5accb0912472735bb00c148f6344a5679
Parents: 1a43bad
Author: zjureel <zj...@gmail.com>
Authored: Mon May 15 17:41:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:41:54 2017 +0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 82 ++++++++++--
.../connectors/kafka/Kafka010ITCase.java | 6 +-
.../kafka/KafkaTestEnvironmentImpl.java | 8 +-
.../connectors/kafka/FlinkKafkaProducer.java | 9 +-
.../connectors/kafka/FlinkKafkaProducer08.java | 40 +++++-
.../connectors/kafka/Kafka08JsonTableSink.java | 17 +++
.../kafka/Kafka08JsonTableSinkTest.java | 14 ++
.../connectors/kafka/KafkaProducerTest.java | 5 +-
.../kafka/KafkaTestEnvironmentImpl.java | 6 +-
.../connectors/kafka/FlinkKafkaProducer09.java | 42 +++++-
.../connectors/kafka/Kafka09JsonTableSink.java | 29 +++++
.../kafka/Kafka09JsonTableSinkTest.java | 15 +++
.../connectors/kafka/KafkaProducerTest.java | 6 +-
.../kafka/KafkaTestEnvironmentImpl.java | 10 +-
.../kafka/FlinkKafkaProducerBase.java | 127 ++++++++++++-------
.../connectors/kafka/KafkaJsonTableSink.java | 16 ++-
.../connectors/kafka/KafkaTableSink.java | 35 ++++-
.../kafka/partitioner/FixedPartitioner.java | 3 +-
.../partitioner/FlinkFixedPartitioner.java | 71 +++++++++++
.../FlinkKafkaDelegatePartitioner.java | 47 +++++++
.../partitioner/FlinkKafkaPartitioner.java | 39 ++++++
.../kafka/partitioner/KafkaPartitioner.java | 14 ++
.../connectors/kafka/KafkaConsumerTestBase.java | 106 ++++++++--------
.../connectors/kafka/KafkaProducerTestBase.java | 11 +-
.../kafka/KafkaTableSinkTestBase.java | 56 +++++++-
.../connectors/kafka/KafkaTestEnvironment.java | 14 +-
.../kafka/TestFlinkFixedPartitioner.java | 104 +++++++++++++++
.../TestFlinkKafkaDelegatePartitioner.java | 111 ++++++++++++++++
.../kafka/testutils/DataGenerators.java | 20 +--
.../kafka/testutils/Tuple2FlinkPartitioner.java | 45 +++++++
.../kafka/testutils/Tuple2Partitioner.java | 9 +-
31 files changed, 937 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index cc0194b..7addafa 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,6 +17,8 @@
package org.apache.flink.streaming.connectors.kafka;
+import java.util.Properties;
+
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -27,7 +29,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -35,8 +38,6 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
-import java.util.Properties;
-
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
@@ -87,7 +88,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+ return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
@@ -106,7 +107,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
String topicId,
SerializationSchema<T> serializationSchema,
Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+ return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
}
/**
@@ -120,7 +121,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ * @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
@@ -133,6 +136,30 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
}
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaPartitioner<T> customPartitioner) {
+
+ GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+ SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ }
+
// ---------------------- Regular constructors w/o timestamp support ------------------
/**
@@ -147,7 +174,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* User defined (keyless) serialization schema.
*/
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
}
/**
@@ -162,7 +189,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Properties with the producer configuration.
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<T>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
}
/**
@@ -173,11 +200,26 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
}
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ */
+ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
// ------------------- Key/Value serialization schema constructors ----------------------
/**
@@ -192,7 +234,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
}
/**
@@ -207,19 +249,32 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Properties with the producer configuration.
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<T>());
+ this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
/**
* Create Kafka producer
*
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
}
+
+ /**
+ * Create Kafka producer
+ *
+ * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ */
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+ // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
+ // invoke call.
+ super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+ }
// ----------------------------- Generic element processing ---------------------------
@@ -243,10 +298,15 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
}
ProducerRecord<byte[], byte[]> record;
- if (internalProducer.partitioner == null) {
+ int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
+ if(null == partitions) {
+ partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+ internalProducer.topicPartitionsMap.put(targetTopic, partitions);
+ }
+ if (internalProducer.flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
} else {
- record = new ProducerRecord<>(targetTopic, internalProducer.partitioner.partition(next, serializedKey, serializedValue, internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+ record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue);
}
if (internalProducer.flushOnCheckpoint) {
synchronized (internalProducer.pendingRecordsLock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 39b2b8f..add623e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -208,11 +208,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
});
final TypeInformationSerializationSchema<Long> longSer = new TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), env.getConfig());
- FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new KafkaPartitioner<Long>() {
+ FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new FlinkKafkaPartitioner<Long>() {
private static final long serialVersionUID = -6730989584364230617L;
@Override
- public int partition(Long next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+ public int partition(Long next, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return (int)(next % 3);
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d27e53a..c88c858 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,8 +30,8 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.Properties;
import java.util.UUID;
@@ -116,7 +116,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return new StreamSink<>(prod);
@@ -124,7 +124,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
@Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a7b89f8..98dac3e 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -37,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), null);
+ super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
}
/**
@@ -45,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+ super(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, (FlinkKafkaPartitioner<IN>)null);
}
/**
@@ -62,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), null);
+ super(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
}
/**
@@ -70,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends FlinkKafkaProducer08<IN> {
*/
@Deprecated
public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- super(topicId, serializationSchema, producerConfig, null);
+ super(topicId, serializationSchema, producerConfig, (FlinkKafkaPartitioner<IN>)null);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 65de5fc..64d3716 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -17,7 +17,8 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -50,7 +51,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* User defined (keyless) serialization schema.
*/
public FlinkKafkaProducer08(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
}
/**
@@ -65,7 +66,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* Properties with the producer configuration.
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
}
/**
@@ -75,12 +76,27 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
}
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+ }
+
// ------------------- Key/Value serialization schema constructors ----------------------
/**
@@ -95,7 +111,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer08(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
}
/**
@@ -110,7 +126,7 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* Properties with the producer configuration.
*/
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+ this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
}
/**
@@ -120,11 +136,25 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, customPartitioner);
+ }
+
@Override
protected void flush() {
// The Kafka 0.8 producer doesn't support flushing, we wait here
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 839388f..5a066ec0 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -38,6 +39,17 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
+
+ /**
+ * Creates {@link KafkaTableSink} for Kafka 0.8
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
@@ -45,6 +57,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
}
@Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+ }
+
+ @Override
protected Kafka08JsonTableSink createCopy() {
return new Kafka08JsonTableSink(topic, properties, partitioner);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 0ac452e..164c162 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -40,6 +41,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
}
@Override
+ protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+ return new Kafka08JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
@SuppressWarnings("unchecked")
protected SerializationSchema<Row> getSerializationSchema() {
return new JsonRowSerializationSchema(FIELD_NAMES);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 65d7596..c7da5af 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,7 +84,7 @@ public class KafkaProducerTest extends TestLogger {
// (1) producer that propagates errors
FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -106,7 +107,7 @@ public class KafkaProducerTest extends TestLogger {
// (2) producer that only logs errors
FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
producerLogging.setLogFailuresOnly(true);
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 643ee8e..2419b53 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -109,7 +109,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
String topic,
KeyedSerializationSchema<T> serSchema,
Properties props,
- KafkaPartitioner<T> partitioner) {
+ FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
topic,
serSchema,
@@ -120,7 +120,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 2a3e39d..4f41c43 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -17,7 +17,8 @@
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -51,7 +52,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* User defined (keyless) serialization schema.
*/
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
}
/**
@@ -66,7 +67,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* Properties with the producer configuration.
*/
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FixedPartitioner<IN>());
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<IN>());
}
/**
@@ -77,12 +78,28 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
}
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ */
+ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+
+ }
+
// ------------------- Key/Value serialization schema constructors ----------------------
/**
@@ -97,7 +114,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
- this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+ this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
}
/**
@@ -112,7 +129,7 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* Properties with the producer configuration.
*/
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
- this(topicId, serializationSchema, producerConfig, new FixedPartitioner<IN>());
+ this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<IN>());
}
/**
@@ -123,11 +140,26 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ */
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, customPartitioner);
+ }
+
@Override
protected void flush() {
if (this.producer != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index edbebd0..b82ebc4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -33,17 +34,45 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
+
+ /**
+ * Creates {@link KafkaTableSink} for Kafka 0.9
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
+ /**
+ *
+ * @param topic Kafka topic to produce to.
+ * @param properties Properties for the Kafka producer.
+ * @param serializationSchema Serialization schema to use to create Kafka records.
+ * @param partitioner Partitioner to select Kafka partition.
+ * @return The version-specific Kafka producer
+ * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+ */
+ @Deprecated
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
}
@Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+ }
+
+ @Override
protected Kafka09JsonTableSink createCopy() {
return new Kafka09JsonTableSink(topic, properties, partitioner);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index df84a0f..ad8f623 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -26,6 +27,7 @@ import java.util.Properties;
public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
+ @Deprecated
@Override
protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
@@ -40,6 +42,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
}
@Override
+ protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+ final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+ return new Kafka09JsonTableSink(topic, properties, partitioner) {
+ @Override
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+ return kafkaProducer;
+ }
+ };
+ }
+
+ @Override
@SuppressWarnings("unchecked")
protected SerializationSchema<Row> getSerializationSchema() {
return new JsonRowSerializationSchema(FIELD_NAMES);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 18b2aec..e9a4947 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,9 +84,8 @@ public class KafkaProducerTest extends TestLogger {
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
// (1) producer that propagates errors
-
FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
// (2) producer that only logs errors
FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
- "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), null);
+ "mock_topic", new SimpleStringSchema(), FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
producerLogging.setLogFailuresOnly(true);
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c9ef6da..84fdbf8 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -19,8 +19,8 @@
package org.apache.flink.streaming.connectors.kafka;
import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
@@ -32,8 +32,8 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -48,9 +48,9 @@ import scala.collection.Seq;
import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -106,14 +106,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
String topic,
KeyedSerializationSchema<T> serSchema,
Properties props,
- KafkaPartitioner<T> partitioner) {
+ FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return new StreamSink<>(prod);
}
@Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return stream.addSink(prod);
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6a7b17f..f9a1e41 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,6 +17,14 @@
package org.apache.flink.streaming.connectors.kafka;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,6 +38,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
@@ -45,13 +55,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
import static java.util.Objects.requireNonNull;
@@ -76,12 +79,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
- * Array with the partition ids of the given defaultTopicId
- * The size of this array is the number of partitions
- */
- protected int[] partitions;
-
- /**
* User defined properties for the Producer
*/
protected final Properties producerConfig;
@@ -98,9 +95,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
protected final KeyedSerializationSchema<IN> schema;
/**
- * User-provided partitioner for assigning an object to a Kafka partition.
+ * User-provided partitioner for assigning an object to a Kafka partition for each topic
+ */
+ protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+
+ /**
+ * Partitions for each topic
*/
- protected final KafkaPartitioner<IN> partitioner;
+ protected final Map<String, int[]> topicPartitionsMap;
/**
* Flag indicating whether to accept failures (and log them), or to fail on failures
@@ -111,7 +113,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
* If true, the producer will wait until all outstanding records have been send to the broker.
*/
protected boolean flushOnCheckpoint;
-
+
+ /**
+ * Retry times of fetching kafka meta
+ */
+ protected long kafkaMetaRetryTimes;
+
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance */
@@ -133,14 +140,28 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
/**
- * The main constructor for creating a FlinkKafkaProducer.
+ * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
*
* @param defaultTopicId The default topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+ }
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param defaultTopicId The default topic to write data to
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
+ */
+ public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
@@ -150,6 +171,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
+ this.flinkKafkaPartitioner = customPartitioner;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
@@ -169,7 +191,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
}
- this.partitioner = customPartitioner;
+ this.topicPartitionsMap = new HashMap<>();
}
// ---------------------------------- Properties --------------------------
@@ -177,9 +199,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
/**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
- * exceptions will be eventually thrown and cause the streaming program to
+ * exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
- *
+ *
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
@@ -205,7 +227,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
}
// ----------------------------------- Utilities --------------------------
-
+
/**
* Initializes the connection to Kafka.
*/
@@ -214,27 +236,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
- if (partitioner != null) {
- // the fetched list is immutable, so we're creating a mutable copy in order to sort it
- List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId));
-
- // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
- Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
+ if(null != flinkKafkaPartitioner) {
+ if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
+ ((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
}
-
- partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+ flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
- LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
+ LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
// register Kafka metrics to Flink accumulators
@@ -281,6 +290,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
}
}
+ protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+ Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ int[] partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ return partitions;
+ }
+
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
@@ -299,11 +328,17 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
targetTopic = defaultTopicId;
}
+ int[] partitions = this.topicPartitionsMap.get(targetTopic);
+ if(null == partitions) {
+ partitions = this.getPartitionsByTopic(targetTopic, producer);
+ this.topicPartitionsMap.put(targetTopic, partitions);
+ }
+
ProducerRecord<byte[], byte[]> record;
- if (partitioner == null) {
+ if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
} else {
- record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue);
+ record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
@@ -319,7 +354,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
if (producer != null) {
producer.close();
}
-
+
// make sure we propagate pending errors
checkErroneous();
}
@@ -376,15 +411,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
}
}
-
+
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
-
+
// validate the broker addresses
for (String broker: elements) {
NetUtils.getCorrectHostnamePort(broker);
}
-
+
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 27c4de7..a0b5033 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,10 +17,11 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.types.Row;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
import java.util.Properties;
@@ -35,10 +36,23 @@ public abstract class KafkaJsonTableSink extends KafkaTableSink {
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
+
+ /**
+ * Creates KafkaJsonTableSink
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
+ super(topic, properties, partitioner);
+ }
@Override
protected SerializationSchema<Row> createSerializationSchema(String[] fieldNames) {
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index a8a2fd0..0a937d6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -39,7 +41,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
protected final String topic;
protected final Properties properties;
protected SerializationSchema<Row> serializationSchema;
- protected final KafkaPartitioner<Row> partitioner;
+ protected final FlinkKafkaPartitioner<Row> partitioner;
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
@@ -49,12 +51,27 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
* @param topic Kafka topic to write to.
* @param properties Properties for the Kafka consumer.
* @param partitioner Partitioner to select Kafka partition for each item
+ * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
public KafkaTableSink(
String topic,
Properties properties,
KafkaPartitioner<Row> partitioner) {
+ this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
+ }
+ /**
+ * Creates KafkaTableSink
+ *
+ * @param topic Kafka topic to write to.
+ * @param properties Properties for the Kafka consumer.
+ * @param partitioner Partitioner to select Kafka partition for each item
+ */
+ public KafkaTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner) {
this.topic = Preconditions.checkNotNull(topic, "topic");
this.properties = Preconditions.checkNotNull(properties, "properties");
this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
@@ -68,13 +85,29 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
* @param serializationSchema Serialization schema to use to create Kafka records.
* @param partitioner Partitioner to select Kafka partition.
* @return The version-specific Kafka producer
+ * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
*/
+ @Deprecated
protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic, Properties properties,
SerializationSchema<Row> serializationSchema,
KafkaPartitioner<Row> partitioner);
/**
+ * Returns the version-specifid Kafka producer.
+ *
+ * @param topic Kafka topic to produce to.
+ * @param properties Properties for the Kafka producer.
+ * @param serializationSchema Serialization schema to use to create Kafka records.
+ * @param partitioner Partitioner to select Kafka partition.
+ * @return The version-specific Kafka producer
+ */
+ protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic, Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner);
+
+ /**
* Create serialization schema for converting table rows into bytes.
*
* @param fieldNames Field names in table rows.
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index 9b848e0..edabfe0 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -48,9 +48,10 @@ import java.io.Serializable;
* Not all Kafka partitions contain data
* To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
* cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ * @deprecated Use {@link FlinkFixedPartitioner} instead.
*
*/
+@Deprecated
public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
private static final long serialVersionUID = 1627268846962918126L;
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
new file mode 100644
index 0000000..d2eb7af
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * # More Flink partitions than kafka partitions
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 --------------/
+ * 3 -------------/
+ * 4 ------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * Flink Sinks: Kafka Partitions
+ * 1 ----------------> 1
+ * 2 ----------------> 2
+ * 3
+ * 4
+ * 5
+ * </pre>
+ *
+ * Not all Kafka partitions contain data
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ */
+public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+ private int parallelInstanceId;
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances) {
+ if (parallelInstanceId < 0 || parallelInstances <= 0) {
+ throw new IllegalArgumentException();
+ }
+ this.parallelInstanceId = parallelInstanceId;
+ }
+
+ @Override
+ public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
+ if(null == partitions || partitions.length == 0) {
+ throw new IllegalArgumentException();
+ }
+
+ return partitions[parallelInstanceId % partitions.length];
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..469fd1b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * Delegate for KafkaPartitioner
+ * @param <T>
+ * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
+ */
+@Deprecated
+public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> {
+ private final KafkaPartitioner<T> kafkaPartitioner;
+ private int[] partitions;
+
+ public FlinkKafkaDelegatePartitioner(KafkaPartitioner<T> kafkaPartitioner) {
+ this.kafkaPartitioner = kafkaPartitioner;
+ }
+
+ public void setPartitions(int[] partitions) {
+ this.partitions = partitions;
+ }
+
+ @Override
+ public void open(int parallelInstanceId, int parallelInstances) {
+ this.kafkaPartitioner.open(parallelInstanceId, parallelInstances, partitions);
+ }
+
+ @Override
+ public int partition(T next, byte[] serializedKey, byte[] serializedValue, String targetTopic, int[] partitions) {
+ return this.kafkaPartitioner.partition(next, serializedKey, serializedValue, this.partitions.length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
new file mode 100644
index 0000000..e074b9b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+ private static final long serialVersionUID = -9086719227828020494L;
+
+ /**
+ * Initializer for the Partitioner.
+ * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * @param parallelInstances the total number of parallel instances
+ */
+ public void open(int parallelInstanceId, int parallelInstances) {
+ // overwrite this method if needed.
+ }
+
+ public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/58c4eed5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 37e2ef6..7c82bd1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
/**
* It contains a open() method which is called on each parallel instance.
* Partitioners must be serializable!
+ * @deprecated Use {@link FlinkKafkaPartitioner} instead.
*/
+@Deprecated
public abstract class KafkaPartitioner<T> implements Serializable {
private static final long serialVersionUID = -1974260817778593473L;
@@ -32,10 +34,22 @@ public abstract class KafkaPartitioner<T> implements Serializable {
* @param parallelInstanceId 0-indexed id of the parallel instance in Flink
* @param parallelInstances the total number of parallel instances
* @param partitions an array describing the partition IDs of the available Kafka partitions.
+ * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
*/
+ @Deprecated
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
// overwrite this method if needed.
}
+ /**
+ *
+ * @param next
+ * @param serializedKey
+ * @param serializedValue
+ * @param numPartitions
+ * @return
+ * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
+ */
+ @Deprecated
public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
}
[6/6] flink git commit: [hotfix] [kafka] Remove unused operator state
store field in FlinkKafkaProducerBase
Posted by tz...@apache.org.
[hotfix] [kafka] Remove unused operator state store field in FlinkKafkaProducerBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0963718a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0963718a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0963718a
Branch: refs/heads/release-1.3
Commit: 0963718acf7137a481d7d1c28140e04ad613e71d
Parents: d3b5870
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 19 12:11:02 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:42:12 2017 +0800
----------------------------------------------------------------------
.../streaming/connectors/kafka/FlinkKafkaProducerBase.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0963718a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 3a8228c..46d7d47 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -27,7 +27,6 @@ import java.util.Properties;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
@@ -130,8 +129,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
/** Number of unacknowledged records. */
protected long pendingRecords;
- protected OperatorStateStore stateStore;
-
/**
* The main constructor for creating a FlinkKafkaProducer.
*
@@ -344,7 +341,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- this.stateStore = context.getOperatorStateStore();
+ // nothing to do
}
@Override
[4/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements
to FlinkKafkaPartitioner custom partitioning
Posted by tz...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8285048..bcc8328 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,10 +29,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Preconditions;
import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -42,31 +46,50 @@ import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public abstract class KafkaProducerTestBase extends KafkaTestBase {
-
/**
- *
+ * This tests verifies that custom partitioning works correctly, with a default topic
+ * and dynamic topic. The number of partitions for each topic is deliberately different.
+ *
+ * Test topology:
+ *
* <pre>
- * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+
- * / | \
- * / | \
- * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink)
- * \ | /
- * \ | /
- * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+
+ * +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+
+ * / | | | |
+ * | | | | ------+--> (sink)
+ * +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+
+ * / |
+ * | |
+ * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+
+ * | | | | |
+ * \ | | | |
+ * +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink)
+ * | | | | |
+ * \ | | | |
+ * +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+
* </pre>
*
- * The mapper validates that the values come consistently from the correct Kafka partition.
+ * Each topic has an independent mapper that validates the values come consistently from
+ * the correct Kafka partition of the topic is is responsible of.
*
- * The final sink validates that there are no duplicates and that all partitions are present.
+ * Each topic also has a final sink that validates that there are no duplicates and that all
+ * partitions are present.
*/
public void runCustomPartitioningTest() {
try {
LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
- final String topic = "customPartitioningTestTopic";
- final int parallelism = 3;
-
- createTestTopic(topic, parallelism, 1);
+ final String defaultTopic = "defaultTopic";
+ final int defaultTopicPartitions = 2;
+
+ final String dynamicTopic = "dynamicTopic";
+ final int dynamicTopicPartitions = 3;
+
+ createTestTopic(defaultTopic, defaultTopicPartitions, 1);
+ createTestTopic(dynamicTopic, dynamicTopicPartitions, 1);
+
+ Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2);
+ expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions);
+ expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions);
TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>");
@@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
env.getConfig().disableSysoutLogging();
TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema =
- new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+ new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
// ------ producing topology ---------
-
+
// source has DOP 1 to make sure it generates no duplicates
DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
public void cancel() {
running = false;
}
- })
- .setParallelism(1);
+ }).setParallelism(1);
Properties props = new Properties();
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
props.putAll(secureProps);
-
- // sink partitions into
- kafkaServer.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(serSchema),
+
+ // sink partitions into
+ kafkaServer.produceIntoKafka(stream, defaultTopic,
+ // this serialization schema will route between the default topic and dynamic topic
+ new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
props,
- new CustomPartitioner(parallelism)).setParallelism(parallelism);
+ new CustomPartitioner(expectedTopicsToNumPartitions))
+ .setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
// ------ consuming topology ---------
Properties consumerProps = new Properties();
consumerProps.putAll(standardProps);
consumerProps.putAll(secureProps);
- FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-
- env.addSource(source).setParallelism(parallelism)
-
- // mapper that validates partitioning and maps to partition
- .map(new RichMapFunction<Tuple2<Long, String>, Integer>() {
-
- private int ourPartition = -1;
- @Override
- public Integer map(Tuple2<Long, String> value) {
- int partition = value.f0.intValue() % parallelism;
- if (ourPartition != -1) {
- assertEquals("inconsistent partitioning", ourPartition, partition);
- } else {
- ourPartition = partition;
- }
- return partition;
- }
- }).setParallelism(parallelism)
-
- .addSink(new SinkFunction<Integer>() {
-
- private int[] valuesPerPartition = new int[parallelism];
-
- @Override
- public void invoke(Integer value) throws Exception {
- valuesPerPartition[value]++;
-
- boolean missing = false;
- for (int i : valuesPerPartition) {
- if (i < 100) {
- missing = true;
- break;
- }
- }
- if (!missing) {
- throw new SuccessException();
- }
- }
- }).setParallelism(1);
-
+
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource =
+ kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps);
+ FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource =
+ kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps);
+
+ env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
+ .map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
+ .addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
+
+ env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
+ .map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
+ .addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
+
tryExecute(env, "custom partitioning test");
- deleteTestTopic(topic);
-
+ deleteTestTopic(defaultTopic);
+ deleteTestTopic(dynamicTopic);
+
LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
}
catch (Exception e) {
@@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase {
public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
- private final int expectedPartitions;
+ private final Map<String, Integer> expectedTopicsToNumPartitions;
- public CustomPartitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
+ public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) {
+ this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions;
}
-
@Override
public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
- assertEquals(expectedPartitions, partitions.length);
+ assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length);
+
+ return (int) (next.f0 % partitions.length);
+ }
+ }
+
+ /**
+ * A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics.
+ */
+ public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
+
+ private final String defaultTopic;
+ private final String dynamicTopic;
+
+ public CustomKeyedSerializationSchemaWrapper(
+ SerializationSchema<Tuple2<Long, String>> serializationSchema,
+ String defaultTopic,
+ String dynamicTopic) {
+
+ super(serializationSchema);
+
+ this.defaultTopic = Preconditions.checkNotNull(defaultTopic);
+ this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic);
+ }
- return (int) (next.f0 % expectedPartitions);
+ @Override
+ public String getTargetTopic(Tuple2<Long, String> element) {
+ return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic;
+ }
+ }
+
+ /**
+ * Mapper that validates partitioning and maps to partition.
+ */
+ public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
+
+ private final int numPartitions;
+
+ private int ourPartition = -1;
+
+ public PartitionValidatingMapper(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ @Override
+ public Integer map(Tuple2<Long, String> value) throws Exception {
+ int partition = value.f0.intValue() % numPartitions;
+ if (ourPartition != -1) {
+ assertEquals("inconsistent partitioning", ourPartition, partition);
+ } else {
+ ourPartition = partition;
+ }
+ return partition;
+ }
+ }
+
+ /**
+ * Sink that validates records received from each partition and checks that there are no duplicates.
+ */
+ public static class PartitionValidatingSink implements SinkFunction<Integer> {
+ private final int[] valuesPerPartition;
+
+ public PartitionValidatingSink(int numPartitions) {
+ this.valuesPerPartition = new int[numPartitions];
+ }
+
+ @Override
+ public void invoke(Integer value) throws Exception {
+ valuesPerPartition[value]++;
+
+ boolean missing = false;
+ for (int i : valuesPerPartition) {
+ if (i < 100) {
+ missing = true;
+ break;
+ }
+ }
+ if (!missing) {
+ throw new SuccessException();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 0fdc82e..d4fe9cc 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,14 +17,12 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import java.io.Serializable;
import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.table.api.Types;
@@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
- private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
- private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner();
+ private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>(
@@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase {
}
@Test
- @SuppressWarnings("unchecked")
- public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
- DataStream dataStream = mock(DataStream.class);
-
- KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner());
- kafkaTableSink.emitDataStream(dataStream);
-
- verify(dataStream).addSink(eq(PRODUCER));
-
- verify(kafkaTableSink).createKafkaProducer(
- eq(TOPIC),
- eq(PROPERTIES),
- any(getSerializationSchema().getClass()),
- eq(FLINK_PARTITIONER));
- }
-
- @Test
public void testConfiguration() {
KafkaTableSink kafkaTableSink = createTableSink();
KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
- @Test
- public void testConfigurationWithFlinkPartitioner() {
- KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner();
- KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
- assertNotSame(kafkaTableSink, newKafkaTableSink);
-
- assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
- assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
- assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
- }
-
protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
- KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
-
- protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic,
- Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+ FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
protected abstract SerializationSchema<Row> getSerializationSchema();
@@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase {
return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER);
}
- private KafkaTableSink createTableSinkWithFlinkPartitioner() {
- return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER);
- }
-
private static Properties createSinkProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:12345");
return properties;
}
- private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable {
- @Override
- public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- return 0;
- }
- }
-
- private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> {
+ private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
@Override
public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 4, partitions);
- Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length));
-
- part.open(2, 4, partitions);
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;)
-
- part.open(3, 4, partitions);
- Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.open(0, 2, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 2, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FixedPartitioner<String> part = new FixedPartitioner<>();
- int[] partitions = new int[]{0,1};
-
- part.open(0, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- part.open(1, 3, partitions);
- Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length));
-
- part.open(2, 3, partitions);
- Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
deleted file mode 100644
index c6be71c..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
- */
-public class TestFlinkKafkaDelegatePartitioner {
-
- /**
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- */
- @Test
- public void testMoreFlinkThanBrokers() {
- FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>();
-
- int[] partitions = new int[]{0};
-
- part.open(0, 4);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 4);
- Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions));
-
- part.open(2, 4);
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;)
-
- part.open(3, 4);
- Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions));
- }
-
- /**
- *
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- *
- * </pre>
- */
- @Test
- public void testFewerPartitions() {
- FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-
- int[] partitions = new int[]{0, 1, 2, 3, 4};
- part.setPartitions(partitions);
-
- part.open(0, 2);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 2);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
- }
-
- /*
- * Flink Sinks: Kafka Partitions
- * 1 ------------>---> 1
- * 2 -----------/----> 2
- * 3 ----------/
- */
- @Test
- public void testMixedCase() {
- FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
- int[] partitions = new int[]{0,1};
- part.setPartitions(partitions);
-
- part.open(0, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- part.open(1, 3);
- Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions));
-
- part.open(2, 3);
- Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 43e1aa7..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead
- */
-@Deprecated
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int expectedPartitions;
-
- public Tuple2Partitioner(int expectedPartitions) {
- this.expectedPartitions = expectedPartitions;
- }
-
- @Override
- public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (numPartitions != expectedPartitions) {
- throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions");
- }
-
- return next.f0;
- }
-}
[5/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements
to FlinkKafkaPartitioner custom partitioning
Posted by tz...@apache.org.
[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning
This commit wraps up some general improvements to the new Kafka sink
custom partitioning API, most notably:
1. remove deprecated constructors from base classes, as they are not
user-facing.
2. modify producer IT test to test custom partitioning for dynamic
topics.
3. improve documentation and Javadocs of the new interfaces.
This closes #3901.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3b58709
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3b58709
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3b58709
Branch: refs/heads/release-1.3
Commit: d3b587096b1fd694625429aa32557e70ed84955d
Parents: 58c4eed
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 18 21:52:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:42:04 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 4 +-
.../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++-----
.../connectors/kafka/FlinkKafkaProducer08.java | 50 +++--
.../connectors/kafka/Kafka08JsonTableSink.java | 21 +-
.../kafka/Kafka08JsonTableSinkTest.java | 25 +--
.../connectors/kafka/FlinkKafkaProducer09.java | 52 +++--
.../connectors/kafka/Kafka09JsonTableSink.java | 28 +--
.../kafka/Kafka09JsonTableSinkTest.java | 26 +--
.../kafka/FlinkKafkaProducerBase.java | 89 ++++----
.../connectors/kafka/KafkaJsonTableSink.java | 14 --
.../connectors/kafka/KafkaTableSink.java | 40 +---
.../kafka/partitioner/FixedPartitioner.java | 77 -------
.../partitioner/FlinkFixedPartitioner.java | 19 +-
.../FlinkKafkaDelegatePartitioner.java | 5 +-
.../partitioner/FlinkKafkaPartitioner.java | 24 +-
.../kafka/partitioner/KafkaPartitioner.java | 16 +-
.../kafka/FlinkKafkaProducerBaseTest.java | 38 ++--
.../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++----
.../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++-------
.../kafka/KafkaTableSinkTestBase.java | 51 +----
.../connectors/kafka/TestFixedPartitioner.java | 104 ---------
.../TestFlinkKafkaDelegatePartitioner.java | 111 ----------
.../kafka/testutils/Tuple2Partitioner.java | 49 ----
23 files changed, 458 insertions(+), 818 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 60a8039..bc7e7de 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -456,9 +456,9 @@ are other constructor variants that allow providing the following:
Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
details on how to configure Kafka Producers.
* *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
constructor. This partitioner will be called for each record in the stream
- to determine which exact partition the record will be sent to.
+ to determine which exact partition of the target topic the record should be sent to.
* *Advanced serialization schema*: Similar to the consumer,
the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
which allows serializing the key and value separately. It also allows to override the target topic,
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7addafa..711fe07 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
@@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig,
- KafkaPartitioner<T> customPartitioner) {
-
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
- FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId The name of the target topic
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
*/
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
@@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
-
+
/**
* Create Kafka producer
*
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
- * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
}
-
+
+ // ----------------------------- Deprecated constructors / factory methods ---------------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated since it does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig,
+ KafkaPartitioner<T> customPartitioner) {
+
+ GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer010<T> kafkaProducer =
+ new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+ SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ *
+ * @deprecated This is a deprecated since it does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* Create Kafka producer
*
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+ @Deprecated
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
- super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+ super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
}
-
// ----------------------------- Generic element processing ---------------------------
private void invokeInternal(T next, long elementTimestamp) throws Exception {
@@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
ProducerRecord<byte[], byte[]> record;
int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
if(null == partitions) {
- partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+ partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
internalProducer.topicPartitionsMap.put(targetTopic, partitions);
}
if (internalProducer.flinkKafkaPartitioner == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 64d3716..08dcb2f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
- }
-
- /**
- * The main constructor for creating a FlinkKafkaProducer.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ // ------------------- Deprecated constructors ----------------------
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* The main constructor for creating a FlinkKafkaProducer.
*
@@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
+ @Deprecated
+ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
}
+ // ---------------------------------------------------------------------
+
@Override
protected void flush() {
// The Kafka 0.8 producer doesn't support flushing, we wait here
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5a066ec0..80bd180 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,7 +29,7 @@ import java.util.Properties;
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.8
*
@@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
*/
- public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.8
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
- public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+ @Deprecated
+ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 164c162..2136476 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -28,26 +27,20 @@ import java.util.Properties;
public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
return new Kafka08JsonTableSink(topic, properties, partitioner) {
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
- @Override
- protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka08JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
return kafkaProducer;
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 4f41c43..cbed361 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*/
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ // ------------------- Deprecated constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
@@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
+ @Deprecated
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
}
+ // ------------------------------------------------------------------
+
@Override
protected void flush() {
if (this.producer != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b82ebc4..a81422e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,43 +29,32 @@ import java.util.Properties;
* Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.9
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
- * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.9
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
- */
- public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- /**
*
- * @param topic Kafka topic to produce to.
- * @param properties Properties for the Kafka producer.
- * @param serializationSchema Serialization schema to use to create Kafka records.
- * @param partitioner Partitioner to select Kafka partition.
- * @return The version-specific Kafka producer
- * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index ad8f623..3afb5e4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -27,28 +26,21 @@ import java.util.Properties;
public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
- @Deprecated
@Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
return new Kafka09JsonTableSink(topic, properties, partitioner) {
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
- @Override
- protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka09JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
return kafkaProducer;
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f9a1e41..3a8228c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
@@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
private static final long serialVersionUID = 1L;
/**
- * Configuration key for disabling the metrics reporting
+ * Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
- * User defined properties for the Producer
+ * User defined properties for the Producer.
*/
protected final Properties producerConfig;
/**
- * The name of the default topic this producer is writing data to
+ * The name of the default topic this producer is writing data to.
*/
protected final String defaultTopicId;
/**
- * (Serializable) SerializationSchema for turning objects used with Flink into
+ * (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
*/
protected final KeyedSerializationSchema<IN> schema;
/**
- * User-provided partitioner for assigning an object to a Kafka partition for each topic
+ * User-provided partitioner for assigning an object to a Kafka partition for each topic.
*/
- protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+ protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
/**
- * Partitions for each topic
+ * Partitions of each topic
*/
protected final Map<String, int[]> topicPartitionsMap;
/**
- * Flag indicating whether to accept failures (and log them), or to fail on failures
+ * Flag indicating whether to accept failures (and log them), or to fail on failures.
*/
protected boolean logFailuresOnly;
@@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
*/
protected boolean flushOnCheckpoint;
- /**
- * Retry times of fetching kafka meta
- */
- protected long kafkaMetaRetryTimes;
-
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance */
@@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
protected OperatorStateStore stateStore;
-
- /**
- * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
- *
- * @param defaultTopicId The default topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
- * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
- }
-
/**
* The main constructor for creating a FlinkKafkaProducer.
*
@@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
+
if(null != flinkKafkaPartitioner) {
if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
- ((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
+ ((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
+ getPartitionsByTopic(this.defaultTopicId, this.producer));
}
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
}
}
- protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
- // the fetched list is immutable, so we're creating a mutable copy in order to sort it
- List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
-
- // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
- Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- int[] partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
-
- return partitions;
- }
-
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
@@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if(null == partitions) {
- partitions = this.getPartitionsByTopic(targetTopic, producer);
+ partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
@@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
} else {
- record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
+ record = new ProducerRecord<>(
+ targetTopic,
+ flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
+ serializedKey,
+ serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
@@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
return props;
}
+ protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+ Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ int[] partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ return partitions;
+ }
+
@VisibleForTesting
protected long numPendingRecords() {
synchronized (pendingRecordsLock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index a0b5033..41bb329 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.types.Row;
@@ -29,19 +28,6 @@ import java.util.Properties;
* Base class for {@link KafkaTableSink} that serializes data in JSON format
*/
public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
- /**
- * Creates KafkaJsonTableSink
- *
- * @param topic topic in Kafka to which table is written
- * @param properties properties to connect to Kafka
- * @param partitioner Kafka partitioner
- * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
/**
* Creates KafkaJsonTableSink
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 0a937d6..1c38816 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,13 +18,11 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
@@ -34,7 +32,7 @@ import java.util.Properties;
* A version-agnostic Kafka {@link AppendStreamTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
*/
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
/**
* Creates KafkaTableSink
- *
- * @param topic Kafka topic to write to.
- * @param properties Properties for the Kafka consumer.
- * @param partitioner Partitioner to select Kafka partition for each item
- * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public KafkaTableSink(
- String topic,
- Properties properties,
- KafkaPartitioner<Row> partitioner) {
- this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
- }
-
- /**
- * Creates KafkaTableSink
*
* @param topic Kafka topic to write to.
* @param properties Properties for the Kafka consumer.
@@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
* @param serializationSchema Serialization schema to use to create Kafka records.
* @param partitioner Partitioner to select Kafka partition.
* @return The version-specific Kafka producer
- * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic, Properties properties,
- SerializationSchema<Row> serializationSchema,
- KafkaPartitioner<Row> partitioner);
-
- /**
- * Returns the version-specifid Kafka producer.
- *
- * @param topic Kafka topic to produce to.
- * @param properties Properties for the Kafka producer.
- * @param serializationSchema Serialization schema to use to create Kafka records.
- * @param partitioner Partitioner to select Kafka partition.
- * @return The version-specific Kafka producer
*/
protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic, Properties properties,
@@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
return copy;
}
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index edabfe0..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * # More Flink partitions than kafka partitions
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- * </pre>
- *
- * Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- * @deprecated Use {@link FlinkFixedPartitioner} instead.
- *
- */
-@Deprecated
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
- private static final long serialVersionUID = 1627268846962918126L;
-
- private int targetPartition = -1;
-
- @Override
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
- throw new IllegalArgumentException();
- }
-
- this.targetPartition = partitions[parallelInstanceId % partitions.length];
- }
-
- @Override
- public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (targetPartition >= 0) {
- return targetPartition;
- } else {
- throw new RuntimeException("The partitioner has not been initialized properly");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index d2eb7af..e47c667 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.streaming.connectors.kafka.partitioner;
+import org.apache.flink.util.Preconditions;
+
/**
* A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
*
@@ -44,9 +46,8 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
* </pre>
*
* Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
*/
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@Override
public void open(int parallelInstanceId, int parallelInstances) {
- if (parallelInstanceId < 0 || parallelInstances <= 0) {
- throw new IllegalArgumentException();
- }
+ Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
+ Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
+
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
- if(null == partitions || partitions.length == 0) {
- throw new IllegalArgumentException();
- }
+ Preconditions.checkArgument(
+ partitions != null && partitions.length > 0,
+ "Partitions of the target topic is empty.");
return partitions[parallelInstanceId % partitions.length];
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 469fd1b..b7b4143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.connectors.kafka.partitioner;
/**
- * Delegate for KafkaPartitioner
- * @param <T>
+ * Delegate for the deprecated {@link KafkaPartitioner}.
+ * This should only be used for bridging deprecated partitioning API methods.
+ *
* @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index e074b9b..b634af7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -20,20 +20,34 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
import java.io.Serializable;
/**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
+ * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
+ * across partitions of multiple Kafka topics.
*/
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+
private static final long serialVersionUID = -9086719227828020494L;
/**
- * Initializer for the Partitioner.
- * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * Initializer for the partitioner. This is called once on each parallel sink instance of
+ * the Flink Kafka producer. This method should be overridden if necessary.
+ *
+ * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
* @param parallelInstances the total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// overwrite this method if needed.
}
-
+
+ /**
+ * Determine the id of the partition that the record should be written to.
+ *
+ * @param record the record value
+ * @param key serialized key of the record
+ * @param value serialized value of the record
+ * @param targetTopic target topic for the record
+ * @param partitions found partitions for the target topic
+ *
+ * @return the id of the target partition
+ */
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 7c82bd1..eebc619 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
/**
* It contains a open() method which is called on each parallel instance.
* Partitioners must be serializable!
- * @deprecated Use {@link FlinkKafkaPartitioner} instead.
+ *
+ * @deprecated This partitioner does not handle partitioning properly in the case of
+ * multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead.
*/
@Deprecated
public abstract class KafkaPartitioner<T> implements Serializable {
@@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements Serializable {
* @param parallelInstanceId 0-indexed id of the parallel instance in Flink
* @param parallelInstances the total number of parallel instances
* @param partitions an array describing the partition IDs of the available Kafka partitions.
- * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
*/
- @Deprecated
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
// overwrite this method if needed.
}
- /**
- *
- * @param next
- * @param serializedKey
- * @param serializedValue
- * @param numPartitions
- * @return
- * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
- */
- @Deprecated
public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 1f16d8e..6b2cc02 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest {
// no bootstrap servers set in props
Properties props = new Properties();
// should throw IllegalArgumentException
- new DummyFlinkKafkaProducer<>(props, null);
+ new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
}
/**
@@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
// should set missing key value deserializers
- new DummyFlinkKafkaProducer<>(props, null);
+ new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
@@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest {
/**
* Tests that partitions list is determinate and correctly provided to custom partitioner
*/
+ @SuppressWarnings("unchecked")
@Test
- public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
- KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+ public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
+ FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
@@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest {
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
- final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
- FakeStandardProducerConfig.get(), mockPartitioner);
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
final KafkaProducer mockProducer = producer.getMockKafkaProducer();
@@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest {
when(mockProducer.metrics()).thenReturn(null);
producer.open(new Configuration());
+ verify(mockPartitioner, times(1)).open(0, 1);
- // the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
- int[] correctPartitionList = {0, 1, 2, 3};
- verify(mockPartitioner).open(0, 1, correctPartitionList);
+ producer.invoke("foobar");
+ verify(mockPartitioner, times(1)).partition(
+ "foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
/**
@@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(false);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest {
private boolean isFlushed;
@SuppressWarnings("unchecked")
- DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+ DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
- super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+ super(DUMMY_TOPIC, schema, producerConfig, partitioner);
this.mockProducer = mock(KafkaProducer.class);
when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 203d814..ac278fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
- private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
- KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
- private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
- public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
- }
-
- @Override
- public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Tuple2<Integer, Integer> t2 = ts.deserialize(in);
- return new Tuple3<>(t2.f0, t2.f1, topic);
- }
-
- @Override
- public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
- }
-
- @Override
- public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
- ByteArrayOutputStream by = new ByteArrayOutputStream();
- DataOutputView out = new DataOutputViewStreamWrapper(by);
- try {
- ts.serialize(new Tuple2<>(element.f0, element.f1), out);
- } catch (IOException e) {
- throw new RuntimeException("Error" ,e);
- }
- return by.toByteArray();
- }
-
- @Override
- public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
- return element.f2;
- }
- }
-
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state.get(0);
}
}
+
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
}