You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 14:03:12 UTC

[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bebe50346d9485fa2d962c5ed1d00da2778c8feb
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 4 13:22:56 2020 +0200

    [FLINK-18075] Remove deprecation of Kafka producer ctor that take
    SerializationSchema
    
    SerializationSchema is an important interface that is widely spread and
    used in other components such as Table API. It is also the most common
    interface for reusable interfaces. Therefore we should support it long
    term in our connectors. This commit removes the deprecation of ctors
    that take this interface.
    
    Moreover it adds the most general ctor that takes all producer
    configuration options along with SerializationSchema. This makes it
    feature equivalent with KafkaSerializationSchema in respect to
    configuration of the producer.
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 84 +++++++++++++++-------
 1 file changed, 57 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 25b359c..3bda1fe 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -288,16 +288,9 @@ public class FlinkKafkaProducer<IN>
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 			User defined (keyless) serialization schema.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(
-			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
-			getPropertiesFromBrokerList(brokerList),
-			Optional.of(new FlinkFixedPartitioner<IN>()));
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
 	}
 
 	/**
@@ -318,16 +311,41 @@ public class FlinkKafkaProducer<IN>
 	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
+	 *
+	 * @param topicId
+	 *          The topic to write data to
+	 * @param serializationSchema
+	 *          A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig
+	 *          Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner
+	 *          A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
+	 *          provided, records will be distributed to Kafka partitions in a round-robin fashion.
+	 */
+	public FlinkKafkaProducer(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
 		this(
 			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			serializationSchema,
 			producerConfig,
-			Optional.of(new FlinkFixedPartitioner<IN>()));
+			customPartitioner.orElse(null),
+			Semantic.AT_LEAST_ONCE,
+			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
 
 	/**
@@ -339,22 +357,34 @@ public class FlinkKafkaProducer<IN>
 	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *                          If a partitioner is not provided, records will be distributed to Kafka partitions
-	 *                          in a round-robin fashion.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
+	 * @param serializationSchema
+	 *          A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig
+	 *          Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner
+	 *          A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
+	 *          provided, records will be distributed to Kafka partitions in a round-robin fashion.
+	 * @param semantic
+	 *          Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}).
+	 * @param kafkaProducersPoolSize
+	 *          Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(
-		String topicId,
-		SerializationSchema<IN> serializationSchema,
-		Properties producerConfig,
-		Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
-
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
+			FlinkKafkaProducer.Semantic semantic,
+			int kafkaProducersPoolSize) {
+		this(
+			topicId,
+			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			customPartitioner,
+			null,
+			producerConfig,
+			semantic,
+			kafkaProducersPoolSize
+		);
 	}
 
 	// ------------------- Key/Value serialization schema constructors ----------------------