You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/03 13:38:11 UTC
[flink] branch release-1.11 updated: [FLINK-18075][kafka] Call open
method of SerializationSchema in Kafka producer
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 2bdc6ec [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer
2bdc6ec is described below
commit 2bdc6ec563705f220e14ca13d1f7e2a1bb47b444
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 3 15:31:20 2020 +0200
[FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer
The open method of SerializationSchema was not called in the universal
Kafka producer.
This closes #12450
---
.../flink/streaming/connectors/kafka/FlinkKafkaProducer.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index b095195..25b359c 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -759,12 +759,21 @@ public class FlinkKafkaProducer<IN>
}
if (kafkaSchema != null) {
- kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
+ kafkaSchema.open(createSerializationInitContext());
+ }
+
+ if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) {
+ ((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema()
+ .open(createSerializationInitContext());
}
super.open(configuration);
}
+ private SerializationSchema.InitializationContext createSerializationInitContext() {
+ return () -> getRuntimeContext().getMetricGroup().addGroup("user");
+ }
+
@Override
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
checkErroneous();