You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/02 15:27:35 UTC

flink git commit: [FLINK-7764][kafka]FlinkKafkaProducer010 does not accept name, uid, or parallelism

Repository: flink
Updated Branches:
  refs/heads/master 8198967ea -> 786a6cbb3


[FLINK-7764][kafka]FlinkKafkaProducer010 does not accept name, uid, or parallelism


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/786a6cbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/786a6cbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/786a6cbb

Branch: refs/heads/master
Commit: 786a6cbb37d34ca918d178c36118dd2e142eacda
Parents: 8198967
Author: Xingcan Cui <xi...@gmail.com>
Authored: Tue Oct 10 21:15:02 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 2 16:25:41 2017 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 57 ++++++++++++++++++--
 1 file changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/786a6cbb/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 3b43a7e..8575268 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -320,18 +322,23 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 	 * Configuration object returned by the writeToKafkaWithTimestamps() call.
 	 *
 	 * <p>This is only kept because it's part of the public API. It is not necessary anymore, now
-	 * that the {@link SinkFunction} interface provides timestamps.
+	 * that the {@link SinkFunction} interface provides timestamps.</p>
+	 *
+	 * <p>To enable the settings, this fake sink must override all the public methods
+	 * in {@link DataStreamSink}.</p>
 	 */
 	public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
 
 		private final FlinkKafkaProducer010 producer;
+		private final SinkTransformation<T> transformation;
 
 		private FlinkKafkaProducer010Configuration(
-				DataStreamSink originalSink,
+				DataStreamSink<T> originalSink,
 				DataStream<T> inputStream,
 				FlinkKafkaProducer010<T> producer) {
 			//noinspection unchecked
 			super(inputStream, originalSink.getTransformation().getOperator());
+			this.transformation = originalSink.getTransformation();
 			this.producer = producer;
 		}
 
@@ -367,6 +374,50 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 		public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
 			producer.writeTimestampToKafka = writeTimestampToKafka;
 		}
-	}
 
+		// *************************************************************************
+		//  Override methods to use the transformation in this class.
+		// *************************************************************************
+
+		@Override
+		public SinkTransformation<T> getTransformation() {
+			return transformation;
+		}
+
+		@Override
+		public DataStreamSink<T> name(String name) {
+			transformation.setName(name);
+			return this;
+		}
+
+		@Override
+		public DataStreamSink<T> uid(String uid) {
+			transformation.setUid(uid);
+			return this;
+		}
+
+		@Override
+		public DataStreamSink<T> setUidHash(String uidHash) {
+			transformation.setUidHash(uidHash);
+			return this;
+		}
+
+		@Override
+		public DataStreamSink<T> setParallelism(int parallelism) {
+			transformation.setParallelism(parallelism);
+			return this;
+		}
+
+		@Override
+		public DataStreamSink<T> disableChaining() {
+			this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
+			return this;
+		}
+
+		@Override
+		public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
+			transformation.setSlotSharingGroup(slotSharingGroup);
+			return this;
+		}
+	}
 }