You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/06 06:49:47 UTC
[5/9] flink git commit: [FLINK-8260] [kafka] Reorder deprecated /
regular constructors of FlinkKafkaProducer010
[FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010
This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.
This closes #5179.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f1acf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f1acf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f1acf9
Branch: refs/heads/release-1.4
Commit: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25
Parents: 7197489
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Dec 18 20:29:38 2017 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 5 22:03:40 2018 -0800
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++----------
1 file changed, 82 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10f1acf9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 21e3a10..0e64aa5 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
*/
private boolean writeTimestampToKafka = false;
- // ---------------------- "Constructors" for timestamp writing ------------------
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId ID of the Kafka topic.
- * @param serializationSchema User defined serialization schema supporting key/value messages
- * @param producerConfig Properties with the producer configuration.
- *
- * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
- * and call {@link #setWriteTimestampToKafka(boolean)}.
- */
- @Deprecated
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
- * the topic.
- *
- * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId ID of the Kafka topic.
- * @param serializationSchema User defined (keyless) serialization schema.
- * @param producerConfig Properties with the producer configuration.
- *
- * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
- * and call {@link #setWriteTimestampToKafka(boolean)}.
- */
- @Deprecated
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- SerializationSchema<T> serializationSchema,
- Properties producerConfig) {
- return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
- }
-
- /**
- * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
- * the topic.
- *
- * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
- *
- * @param inStream The stream to write to Kafka
- * @param topicId The name of the target topic
- * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
- * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
- * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
- *
- * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
- * and call {@link #setWriteTimestampToKafka(boolean)}.
- */
- @Deprecated
- public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
- String topicId,
- KeyedSerializationSchema<T> serializationSchema,
- Properties producerConfig,
- FlinkKafkaPartitioner<T> customPartitioner) {
-
- FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
-
- }
-
// ---------------------- Regular constructors------------------
/**
@@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
+ // ------------------- User configuration ----------------------
+
+ /**
+ * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
+ * Timestamps must be positive for Kafka to accept them.
+ *
+ * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
+ */
+ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+ this.writeTimestampToKafka = writeTimestampToKafka;
+ }
+
// ----------------------------- Deprecated constructors / factory methods ---------------------------
/**
@@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
*
* <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
+ */
+ @Deprecated
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization schema.
+ * @param producerConfig Properties with the producer configuration.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
+ */
+ @Deprecated
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ SerializationSchema<T> serializationSchema,
+ Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
+ * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
+ *
+ * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+ * and call {@link #setWriteTimestampToKafka(boolean)}.
+ */
+ @Deprecated
+ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
+ String topicId,
+ KeyedSerializationSchema<T> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaPartitioner<T> customPartitioner) {
+ FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
+ DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
+ * the topic.
+ *
+ * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
+ *
* @param inStream The stream to write to Kafka
* @param topicId The name of the target topic
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
@@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
- /**
- * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
- * Timestamps must be positive for Kafka to accept them.
- *
- * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
- */
- public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
- this.writeTimestampToKafka = writeTimestampToKafka;
- }
-
-
// ----------------------------- Generic element processing ---------------------------
@Override