You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/02 15:31:08 UTC
flink git commit: [FLINK-7764] [kafka] Enable the operator settings
for FlinkKafkaProducer010
Repository: flink
Updated Branches:
refs/heads/release-1.3 168378d98 -> 6714f4a39
[FLINK-7764] [kafka] Enable the operator settings for FlinkKafkaProducer010
[hotfix] [kafka] Fix the config parameter names in KafkaTestBase.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6714f4a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6714f4a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6714f4a3
Branch: refs/heads/release-1.3
Commit: 6714f4a3917bc1fd29529c34115011c71f0686e3
Parents: 168378d
Author: Xingcan Cui <xi...@gmail.com>
Authored: Wed Oct 11 23:42:29 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 2 16:29:12 2017 +0100
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer010.java | 63 ++++++++++++++++++--
1 file changed, 57 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6714f4a3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7909ba6..1019c09 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -31,7 +31,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
@@ -135,8 +138,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ SingleOutputStreamOperator<Object> streamOperator = inStream.transform
+ ("FlinkKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(streamOperator, kafkaProducer);
}
// ---------------------- Regular constructors w/o timestamp support ------------------
@@ -255,8 +259,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class);
FlinkKafkaProducer010<T> kafkaProducer =
new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
- SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
- return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer);
+ SingleOutputStreamOperator<Object> streamOperator = inStream.transform
+ ("FlinkKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(streamOperator, kafkaProducer);
}
/**
@@ -440,12 +445,14 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
private final FlinkKafkaProducerBase wrappedProducerBase;
private final FlinkKafkaProducer010 producer;
+ private final StreamTransformation transformation;
private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) {
//noinspection unchecked
super(stream, producer);
- this.producer = producer;
this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction;
+ this.producer = producer;
+ this.transformation = stream.getTransformation();
}
/**
@@ -480,7 +487,51 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.producer.writeTimestampToKafka = writeTimestampToKafka;
}
- }
+ // *************************************************************************
+ // Override methods to use the transformation in this class.
+ // *************************************************************************
+
+ @Override
+ public SinkTransformation<T> getTransformation() {
+ throw new UnsupportedOperationException("The SinkTransformation is not accessible " +
+ "from " + this.getClass().getSimpleName());
+ }
+
+ @Override
+ public DataStreamSink<T> name(String name) {
+ transformation.setName(name);
+ return this;
+ }
+
+ @Override
+ public DataStreamSink<T> uid(String uid) {
+ transformation.setUid(uid);
+ return this;
+ }
+ @Override
+ public DataStreamSink<T> setUidHash(String uidHash) {
+ transformation.setUidHash(uidHash);
+ return this;
+ }
+
+ @Override
+ public DataStreamSink<T> setParallelism(int parallelism) {
+ transformation.setParallelism(parallelism);
+ return this;
+ }
+
+ @Override
+ public DataStreamSink<T> disableChaining() {
+ this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
+ return this;
+ }
+
+ @Override
+ public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
+ transformation.setSlotSharingGroup(slotSharingGroup);
+ return this;
+ }
+ }
}