You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 06:49:47 UTC

[5/9] flink git commit: [FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

[FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

This closes #5179.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f1acf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f1acf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f1acf9

Branch: refs/heads/release-1.4
Commit: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25
Parents: 7197489
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:29:38 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:40 2018 -0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++----------
 1 file changed, 82 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10f1acf9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 21e3a10..0e64aa5 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 */
 	private boolean writeTimestampToKafka = false;
 
-	// ---------------------- "Constructors" for timestamp writing ------------------
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined serialization schema supporting key/value messages
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 * @param inStream The stream to write to Kafka
-	 * @param topicId ID of the Kafka topic.
-	 * @param serializationSchema User defined (keyless) serialization schema.
-	 * @param producerConfig Properties with the producer configuration.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					SerializationSchema<T> serializationSchema,
-																					Properties producerConfig) {
-		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
-	}
-
-	/**
-	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
-	 * the topic.
-	 *
-	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
-	 *
-	 *  @param inStream The stream to write to Kafka
-	 *  @param topicId The name of the target topic
-	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
-	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *
-	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
-	 * and call {@link #setWriteTimestampToKafka(boolean)}.
-	 */
-	@Deprecated
-	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
-																					String topicId,
-																					KeyedSerializationSchema<T> serializationSchema,
-																					Properties producerConfig,
-																					FlinkKafkaPartitioner<T> customPartitioner) {
-
-		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
-		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
-		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
-
-	}
-
 	// ---------------------- Regular constructors------------------
 
 	/**
@@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
+	// ------------------- User configuration ----------------------
+
+	/**
+	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+	 * Timestamps must be positive for Kafka to accept them.
+	 *
+	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+	 */
+	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+		this.writeTimestampToKafka = writeTimestampToKafka;
+	}
+
 	// ----------------------------- Deprecated constructors / factory methods  ---------------------------
 
 	/**
@@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 *
 	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
 	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined serialization schema supporting key/value messages
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 * @param inStream The stream to write to Kafka
+	 * @param topicId ID of the Kafka topic.
+	 * @param serializationSchema User defined (keyless) serialization schema.
+	 * @param producerConfig Properties with the producer configuration.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					SerializationSchema<T> serializationSchema,
+																					Properties producerConfig) {
+		return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
+	 *  @param inStream The stream to write to Kafka
+	 *  @param topicId The name of the target topic
+	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+	 *  @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 *  @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+	 *
+	 * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+	 * and call {@link #setWriteTimestampToKafka(boolean)}.
+	 */
+	@Deprecated
+	public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+																					String topicId,
+																					KeyedSerializationSchema<T> serializationSchema,
+																					Properties producerConfig,
+																					FlinkKafkaPartitioner<T> customPartitioner) {
+		FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+		DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+		return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+	 * the topic.
+	 *
+	 * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+	 *
 	 *  @param inStream The stream to write to Kafka
 	 *  @param topicId The name of the target topic
 	 *  @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
@@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		super(topicId, serializationSchema, producerConfig, customPartitioner);
 	}
 
-	/**
-	 * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
-	 * Timestamps must be positive for Kafka to accept them.
-	 *
-	 * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
-	 */
-	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
-		this.writeTimestampToKafka = writeTimestampToKafka;
-	}
-
-
 	// ----------------------------- Generic element processing  ---------------------------
 
 	@Override