You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/19 06:40:15 UTC
[5/6] flink git commit: [FLINK-6288] [kafka] Cleanup and improvements
to FlinkKafkaPartitioner custom partitioning
[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning
This commit wraps up some general improvements to the new Kafka sink
custom partitioning API, most notably:
1. remove deprecated constructors from base classes, as they are not
user-facing.
2. modify producer IT test to test custom partitioning for dynamic
topics.
3. improve documentation and Javadocs of the new interfaces.
This closes #3901.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fcbb08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fcbb08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fcbb08
Branch: refs/heads/master
Commit: e3fcbb087568a33aa03f58ba0dc359b1a6b02bfd
Parents: 9ed9b68
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu May 18 21:52:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 19 14:38:48 2017 +0800
----------------------------------------------------------------------
docs/dev/connectors/kafka.md | 4 +-
.../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++-----
.../connectors/kafka/FlinkKafkaProducer08.java | 50 +++--
.../connectors/kafka/Kafka08JsonTableSink.java | 21 +-
.../kafka/Kafka08JsonTableSinkTest.java | 25 +--
.../connectors/kafka/FlinkKafkaProducer09.java | 52 +++--
.../connectors/kafka/Kafka09JsonTableSink.java | 28 +--
.../kafka/Kafka09JsonTableSinkTest.java | 26 +--
.../kafka/FlinkKafkaProducerBase.java | 89 ++++----
.../connectors/kafka/KafkaJsonTableSink.java | 14 --
.../connectors/kafka/KafkaTableSink.java | 40 +---
.../kafka/partitioner/FixedPartitioner.java | 77 -------
.../partitioner/FlinkFixedPartitioner.java | 19 +-
.../FlinkKafkaDelegatePartitioner.java | 5 +-
.../partitioner/FlinkKafkaPartitioner.java | 24 +-
.../kafka/partitioner/KafkaPartitioner.java | 16 +-
.../kafka/FlinkKafkaProducerBaseTest.java | 38 ++--
.../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++----
.../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++-------
.../kafka/KafkaTableSinkTestBase.java | 51 +----
.../connectors/kafka/TestFixedPartitioner.java | 104 ---------
.../TestFlinkKafkaDelegatePartitioner.java | 111 ----------
.../kafka/testutils/Tuple2Partitioner.java | 49 ----
23 files changed, 458 insertions(+), 818 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 60a8039..bc7e7de 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -456,9 +456,9 @@ are other constructor variants that allow providing the following:
Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for
details on how to configure Kafka Producers.
* *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the
constructor. This partitioner will be called for each record in the stream
- to determine which exact partition the record will be sent to.
+ to determine which exact partition of the target topic the record should be sent to.
* *Advanced serialization schema*: Similar to the consumer,
the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`,
which allows serializing the key and value separately. It also allows to override the target topic,
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7addafa..711fe07 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
+import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
@@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig,
- KafkaPartitioner<T> customPartitioner) {
-
- GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
- FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId The name of the target topic
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
*/
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
@@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
-
+
/**
* Create Kafka producer
*
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
- * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
}
-
+
+ // ----------------------------- Deprecated constructors / factory methods ---------------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated since it does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig,
+ KafkaPartitioner<T> customPartitioner) {
+
+ GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer010<T> kafkaProducer =
+ new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
+ SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ *
+ * @deprecated This is a deprecated since it does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* Create Kafka producer
*
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
+ @Deprecated
+ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
- super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner));
+ super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
}
-
// ----------------------------- Generic element processing ---------------------------
private void invokeInternal(T next, long elementTimestamp) throws Exception {
@@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
ProducerRecord<byte[], byte[]> record;
int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
if(null == partitions) {
- partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+ partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
internalProducer.topicPartitionsMap.put(targetTopic, partitions);
}
if (internalProducer.flinkKafkaPartitioner == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 64d3716..08dcb2f 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
- }
-
- /**
- * The main constructor for creating a FlinkKafkaProducer.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
*/
public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ // ------------------- Deprecated constructors ----------------------
+
+ /**
+ * The main constructor for creating a FlinkKafkaProducer.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* The main constructor for creating a FlinkKafkaProducer.
*
@@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assining messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
+ @Deprecated
+ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
}
+ // ---------------------------------------------------------------------
+
@Override
protected void flush() {
// The Kafka 0.8 producer doesn't support flushing, we wait here
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5a066ec0..80bd180 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,7 +29,7 @@ import java.util.Properties;
* Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.8
*
@@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink {
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
*/
- public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.8
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
- public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner);
+ @Deprecated
+ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 164c162..2136476 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -28,26 +27,20 @@ import java.util.Properties;
public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
return new Kafka08JsonTableSink(topic, properties, partitioner) {
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
- @Override
- protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka08JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
return kafkaProducer;
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 4f41c43..cbed361 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
- * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
-
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * @param topicId The topic to write data to
- * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*/
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
@@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ // ------------------- Deprecated constructors ----------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * @param topicId The topic to write data to
+ * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
+ */
+ @Deprecated
+ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+ }
+
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
@@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
- public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
- super(topicId, serializationSchema, producerConfig, customPartitioner);
+ @Deprecated
+ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
+ super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
}
+ // ------------------------------------------------------------------
+
@Override
protected void flush() {
if (this.producer != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b82ebc4..a81422e 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,43 +29,32 @@ import java.util.Properties;
* Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
*/
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.9
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
- * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
*/
- @Deprecated
- public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
-
+
/**
* Creates {@link KafkaTableSink} for Kafka 0.9
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
- */
- public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
-
- /**
*
- * @param topic Kafka topic to produce to.
- * @param properties Properties for the Kafka producer.
- * @param serializationSchema Serialization schema to use to create Kafka records.
- * @param partitioner Partitioner to select Kafka partition.
- * @return The version-specific Kafka producer
- * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
+ * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
+ * producing to multiple topics. Use
+ * {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
+ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
+ super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index ad8f623..3afb5e4 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -27,28 +26,21 @@ import java.util.Properties;
public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
- @Deprecated
@Override
- protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
return new Kafka09JsonTableSink(topic, properties, partitioner) {
@Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) {
- return kafkaProducer;
- }
- };
- }
+ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+ String topic,
+ Properties properties,
+ SerializationSchema<Row> serializationSchema,
+ FlinkKafkaPartitioner<Row> partitioner) {
- @Override
- protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
- final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
- return new Kafka09JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
- SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
return kafkaProducer;
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f9a1e41..3a8228c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.producer.Callback;
@@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
private static final long serialVersionUID = 1L;
/**
- * Configuration key for disabling the metrics reporting
+ * Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
- * User defined properties for the Producer
+ * User defined properties for the Producer.
*/
protected final Properties producerConfig;
/**
- * The name of the default topic this producer is writing data to
+ * The name of the default topic this producer is writing data to.
*/
protected final String defaultTopicId;
/**
- * (Serializable) SerializationSchema for turning objects used with Flink into
+ * (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
*/
protected final KeyedSerializationSchema<IN> schema;
/**
- * User-provided partitioner for assigning an object to a Kafka partition for each topic
+ * User-provided partitioner for assigning an object to a Kafka partition for each topic.
*/
- protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+ protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
/**
- * Partitions for each topic
+ * Partitions of each topic
*/
protected final Map<String, int[]> topicPartitionsMap;
/**
- * Flag indicating whether to accept failures (and log them), or to fail on failures
+ * Flag indicating whether to accept failures (and log them), or to fail on failures.
*/
protected boolean logFailuresOnly;
@@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
*/
protected boolean flushOnCheckpoint;
- /**
- * Retry times of fetching kafka meta
- */
- protected long kafkaMetaRetryTimes;
-
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance */
@@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
protected OperatorStateStore stateStore;
-
- /**
- * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead
- *
- * @param defaultTopicId The default topic to write data to
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner.
- * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
- this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner));
- }
-
/**
* The main constructor for creating a FlinkKafkaProducer.
*
@@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
+
if(null != flinkKafkaPartitioner) {
if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
- ((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
+ ((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions(
+ getPartitionsByTopic(this.defaultTopicId, this.producer));
}
flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
}
}
- protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
- // the fetched list is immutable, so we're creating a mutable copy in order to sort it
- List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
-
- // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
- Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
- @Override
- public int compare(PartitionInfo o1, PartitionInfo o2) {
- return Integer.compare(o1.partition(), o2.partition());
- }
- });
-
- int[] partitions = new int[partitionsList.size()];
- for (int i = 0; i < partitions.length; i++) {
- partitions[i] = partitionsList.get(i).partition();
- }
-
- return partitions;
- }
-
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
@@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if(null == partitions) {
- partitions = this.getPartitionsByTopic(targetTopic, producer);
+ partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
@@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
} else {
- record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
+ record = new ProducerRecord<>(
+ targetTopic,
+ flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
+ serializedKey,
+ serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
@@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im
return props;
}
+ protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
+ // the fetched list is immutable, so we're creating a mutable copy in order to sort it
+ List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
+
+ // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
+ Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
+ @Override
+ public int compare(PartitionInfo o1, PartitionInfo o2) {
+ return Integer.compare(o1.partition(), o2.partition());
+ }
+ });
+
+ int[] partitions = new int[partitionsList.size()];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = partitionsList.get(i).partition();
+ }
+
+ return partitions;
+ }
+
@VisibleForTesting
protected long numPendingRecords() {
synchronized (pendingRecordsLock) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index a0b5033..41bb329 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.types.Row;
@@ -29,19 +28,6 @@ import java.util.Properties;
* Base class for {@link KafkaTableSink} that serializes data in JSON format
*/
public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
- /**
- * Creates KafkaJsonTableSink
- *
- * @param topic topic in Kafka to which table is written
- * @param properties properties to connect to Kafka
- * @param partitioner Kafka partitioner
- * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
- super(topic, properties, partitioner);
- }
/**
* Creates KafkaJsonTableSink
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 0a937d6..1c38816 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,13 +18,11 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
@@ -34,7 +32,7 @@ import java.util.Properties;
* A version-agnostic Kafka {@link AppendStreamTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
*/
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
/**
* Creates KafkaTableSink
- *
- * @param topic Kafka topic to write to.
- * @param properties Properties for the Kafka consumer.
- * @param partitioner Partitioner to select Kafka partition for each item
- * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- public KafkaTableSink(
- String topic,
- Properties properties,
- KafkaPartitioner<Row> partitioner) {
- this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner));
- }
-
- /**
- * Creates KafkaTableSink
*
* @param topic Kafka topic to write to.
* @param properties Properties for the Kafka consumer.
@@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
* @param serializationSchema Serialization schema to use to create Kafka records.
* @param partitioner Partitioner to select Kafka partition.
* @return The version-specific Kafka producer
- * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
- */
- @Deprecated
- protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
- String topic, Properties properties,
- SerializationSchema<Row> serializationSchema,
- KafkaPartitioner<Row> partitioner);
-
- /**
- * Returns the version-specifid Kafka producer.
- *
- * @param topic Kafka topic to produce to.
- * @param properties Properties for the Kafka producer.
- * @param serializationSchema Serialization schema to use to create Kafka records.
- * @param partitioner Partitioner to select Kafka partition.
- * @return The version-specific Kafka producer
*/
protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic, Properties properties,
@@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
return copy;
}
-
-
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index edabfe0..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- * # More Flink partitions than kafka partitions
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 --------------/
- * 3 -------------/
- * 4 ------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- * Flink Sinks: Kafka Partitions
- * 1 ----------------> 1
- * 2 ----------------> 2
- * 3
- * 4
- * 5
- * </pre>
- *
- * Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- * @deprecated Use {@link FlinkFixedPartitioner} instead.
- *
- */
-@Deprecated
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
- private static final long serialVersionUID = 1627268846962918126L;
-
- private int targetPartition = -1;
-
- @Override
- public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
- if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) {
- throw new IllegalArgumentException();
- }
-
- this.targetPartition = partitions[parallelInstanceId % partitions.length];
- }
-
- @Override
- public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
- if (targetPartition >= 0) {
- return targetPartition;
- } else {
- throw new RuntimeException("The partitioner has not been initialized properly");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index d2eb7af..e47c667 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.streaming.connectors.kafka.partitioner;
+import org.apache.flink.util.Preconditions;
+
/**
* A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
*
@@ -44,9 +46,8 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
* </pre>
*
* Not all Kafka partitions contain data
- * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
- * cause a lot of network connections between all the Flink instances and all the Kafka brokers
- *
+ * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will
+ * cause a lot of network connections between all the Flink instances and all the Kafka brokers).
*/
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
@Override
public void open(int parallelInstanceId, int parallelInstances) {
- if (parallelInstanceId < 0 || parallelInstances <= 0) {
- throw new IllegalArgumentException();
- }
+ Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
+ Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
+
this.parallelInstanceId = parallelInstanceId;
}
@Override
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
- if(null == partitions || partitions.length == 0) {
- throw new IllegalArgumentException();
- }
+ Preconditions.checkArgument(
+ partitions != null && partitions.length > 0,
+ "Partitions of the target topic is empty.");
return partitions[parallelInstanceId % partitions.length];
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 469fd1b..b7b4143 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,8 +18,9 @@
package org.apache.flink.streaming.connectors.kafka.partitioner;
/**
- * Delegate for KafkaPartitioner
- * @param <T>
+ * Delegate for the deprecated {@link KafkaPartitioner}.
+ * This should only be used for bridging deprecated partitioning API methods.
+ *
* @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead
*/
@Deprecated
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index e074b9b..b634af7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -20,20 +20,34 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
import java.io.Serializable;
/**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
+ * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
+ * across partitions of multiple Kafka topics.
*/
public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+
private static final long serialVersionUID = -9086719227828020494L;
/**
- * Initializer for the Partitioner.
- * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+ * Initializer for the partitioner. This is called once on each parallel sink instance of
+ * the Flink Kafka producer. This method should be overridden if necessary.
+ *
+ * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink
* @param parallelInstances the total number of parallel instances
*/
public void open(int parallelInstanceId, int parallelInstances) {
// overwrite this method if needed.
}
-
+
+ /**
+ * Determine the id of the partition that the record should be written to.
+ *
+ * @param record the record value
+ * @param key serialized key of the record
+ * @param value serialized value of the record
+ * @param targetTopic target topic for the record
+ * @param partitions found partitions for the target topic
+ *
+ * @return the id of the target partition
+ */
public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 7c82bd1..eebc619 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
/**
* It contains a open() method which is called on each parallel instance.
* Partitioners must be serializable!
- * @deprecated Use {@link FlinkKafkaPartitioner} instead.
+ *
+ * @deprecated This partitioner does not handle partitioning properly in the case of
+ * multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead.
*/
@Deprecated
public abstract class KafkaPartitioner<T> implements Serializable {
@@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements Serializable {
* @param parallelInstanceId 0-indexed id of the parallel instance in Flink
* @param parallelInstances the total number of parallel instances
* @param partitions an array describing the partition IDs of the available Kafka partitions.
- * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
*/
- @Deprecated
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
// overwrite this method if needed.
}
- /**
- *
- * @param next
- * @param serializedKey
- * @param serializedValue
- * @param numPartitions
- * @return
- * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead.
- */
- @Deprecated
public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 1f16d8e..6b2cc02 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest {
// no bootstrap servers set in props
Properties props = new Properties();
// should throw IllegalArgumentException
- new DummyFlinkKafkaProducer<>(props, null);
+ new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
}
/**
@@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345");
// should set missing key value deserializers
- new DummyFlinkKafkaProducer<>(props, null);
+ new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
@@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest {
/**
* Tests that partitions list is determinate and correctly provided to custom partitioner
*/
+ @SuppressWarnings("unchecked")
@Test
- public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception {
- KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+ public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception {
+ FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class);
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
@@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest {
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
- final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
- FakeStandardProducerConfig.get(), mockPartitioner);
+ final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
final KafkaProducer mockProducer = producer.getMockKafkaProducer();
@@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest {
when(mockProducer.metrics()).thenReturn(null);
producer.open(new Configuration());
+ verify(mockPartitioner, times(1)).open(0, 1);
- // the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method
- int[] correctPartitionList = {0, 1, 2, 3};
- verify(mockPartitioner).open(0, 1, correctPartitionList);
+ producer.invoke("foobar");
+ verify(mockPartitioner, times(1)).partition(
+ "foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
}
/**
@@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
@Test
public void testAsyncErrorRethrownOnInvoke() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
@Test
public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer));
@@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=5000)
public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=10000)
public void testAtLeastOnceProducer() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(true);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest {
@Test(timeout=5000)
public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null);
+ FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
producer.setFlushOnCheckpoint(false);
final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer();
@@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest {
private boolean isFlushed;
@SuppressWarnings("unchecked")
- DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) {
+ DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
- super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+ super(DUMMY_TOPIC, schema, producerConfig, partitioner);
this.mockProducer = mock(KafkaProducer.class);
when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 203d814..ac278fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
- private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
- KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
- private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
- public Tuple2WithTopicSchema(ExecutionConfig ec) {
- ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
- }
-
- @Override
- public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
- DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
- Tuple2<Integer, Integer> t2 = ts.deserialize(in);
- return new Tuple3<>(t2.f0, t2.f1, topic);
- }
-
- @Override
- public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
- return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
- }
-
- @Override
- public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
- return null;
- }
-
- @Override
- public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
- ByteArrayOutputStream by = new ByteArrayOutputStream();
- DataOutputView out = new DataOutputViewStreamWrapper(by);
- try {
- ts.serialize(new Tuple2<>(element.f0, element.f1), out);
- } catch (IOException e) {
- throw new RuntimeException("Error" ,e);
- }
- return by.toByteArray();
- }
-
- @Override
- public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
- return element.f2;
- }
- }
-
/**
* Test Flink's Kafka integration also with very big records (30MB)
* see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
this.numElementsTotal = state.get(0);
}
}
+
+ private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+ KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+ public Tuple2WithTopicSchema(ExecutionConfig ec) {
+ ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+ DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+ Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+ return new Tuple3<>(t2.f0, t2.f1, topic);
+ }
+
+ @Override
+ public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() {
+ return TypeInfoParser.parse("Tuple3<Integer, Integer, String>");
+ }
+
+ @Override
+ public byte[] serializeKey(Tuple3<Integer, Integer, String> element) {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeValue(Tuple3<Integer, Integer, String> element) {
+ ByteArrayOutputStream by = new ByteArrayOutputStream();
+ DataOutputView out = new DataOutputViewStreamWrapper(by);
+ try {
+ ts.serialize(new Tuple2<>(element.f0, element.f1), out);
+ } catch (IOException e) {
+ throw new RuntimeException("Error" ,e);
+ }
+ return by.toByteArray();
+ }
+
+ @Override
+ public String getTargetTopic(Tuple3<Integer, Integer, String> element) {
+ return element.f2;
+ }
+ }
}