You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/03 13:38:11 UTC

[flink] branch release-1.11 updated: [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 2bdc6ec  [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer
2bdc6ec is described below

commit 2bdc6ec563705f220e14ca13d1f7e2a1bb47b444
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 3 15:31:20 2020 +0200

    [FLINK-18075][kafka] Call open method of SerializationSchema in Kafka producer
    
    The open method of SerializationSchema was not called in the universal
    Kafka producer.
    
    This closes #12450
---
 .../flink/streaming/connectors/kafka/FlinkKafkaProducer.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index b095195..25b359c 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -759,12 +759,21 @@ public class FlinkKafkaProducer<IN>
 		}
 
 		if (kafkaSchema != null) {
-			kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
+			kafkaSchema.open(createSerializationInitContext());
+		}
+
+		if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) {
+			((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema()
+				.open(createSerializationInitContext());
 		}
 
 		super.open(configuration);
 	}
 
+	private SerializationSchema.InitializationContext createSerializationInitContext() {
+		return () -> getRuntimeContext().getMetricGroup().addGroup("user");
+	}
+
 	@Override
 	public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
 		checkErroneous();