You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 14:03:13 UTC

[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74fd2912d146cc2db103825f0aee2282a99b5774
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 4 13:17:27 2020 +0200

    [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector
---
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +++++-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java    |  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java    |  11 +-
 .../kafka/KafkaShortRetentionTestBase.java         |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java     |  24 ++++-
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java       |  28 ++---
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +++++++++++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 ++++-
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 479 insertions(+), 58 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..2bd54f1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer010}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer010<Integer> kafkaProducer = new FlinkKafkaProducer010<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9ae751b..33abb05 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -154,7 +155,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
@@ -168,6 +173,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return stream.addSink(prod);
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..6001245
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer011}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c846f16..db82233 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,10 +17,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -267,10 +269,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		return new StreamSink<>(new FlinkKafkaProducer011<>(
 			topic,
-			serSchema,
+			new KeyedSerializationSchemaWrapper<>(serSchema),
 			props,
 			Optional.ofNullable(partitioner),
 			producerSemantic,
@@ -278,7 +284,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		return stream.addSink(new FlinkKafkaProducer011<>(
 			topic,
 			serSchema,
@@ -289,6 +300,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer011<>(
+			topic,
+			new KeyedSerializationSchemaWrapper<>(serSchema),
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
new file mode 100644
index 0000000..3cd0fc7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * An adapter from old style interfaces such as {@link org.apache.flink.api.common.serialization.SerializationSchema},
+ * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
+ * {@link KafkaSerializationSchema}.
+ */
+public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
+
+	private final FlinkKafkaPartitioner<T> partitioner;
+	private final SerializationSchema<T> serializationSchema;
+	private final String topic;
+	private boolean writeTimestamp;
+
+	private int[] partitions;
+
+	public KafkaSerializationSchemaWrapper(
+			String topic,
+			FlinkKafkaPartitioner<T> partitioner,
+			boolean writeTimestamp,
+			SerializationSchema<T> serializationSchema) {
+		this.partitioner = partitioner;
+		this.serializationSchema = serializationSchema;
+		this.topic = topic;
+		this.writeTimestamp = writeTimestamp;
+	}
+
+	@Override
+	public void open(SerializationSchema.InitializationContext context) throws Exception {
+		serializationSchema.open(context);
+	}
+
+	@Override
+	public ProducerRecord<byte[], byte[]> serialize(
+			T element,
+			@Nullable Long timestamp) {
+		byte[] serialized = serializationSchema.serialize(element);
+		final Integer partition;
+		if (partitioner != null) {
+			partition = partitioner.partition(element, null, serialized, topic, partitions);
+		} else {
+			partition = null;
+		}
+
+		final Long timestampToWrite;
+		if (writeTimestamp) {
+			timestampToWrite = timestamp;
+		} else {
+			timestampToWrite = null;
+		}
+
+		return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
+	}
+
+	@Override
+	public String getTargetTopic(T element) {
+		return topic;
+	}
+
+	@Override
+	public void setPartitions(int[] partitions) {
+		this.partitions = partitions;
+	}
+
+	public void setWriteTimestamp(boolean writeTimestamp) {
+		this.writeTimestamp = writeTimestamp;
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index d99f8d9..8406f70 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -64,7 +65,6 @@ import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
@@ -418,9 +418,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-			new KeyedSerializationSchemaWrapper<>(
-				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 			new KafkaDeserializationSchemaWrapper<>(
@@ -747,7 +746,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
 		producerProperties.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+		kafkaServer.produceIntoKafka(stream, topic, sinkSchema, producerProperties, null);
 
 		// ----------- add consumer dataflow ----------
 
@@ -1302,7 +1301,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 			}
 		});
 
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
+		kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProps, null);
 
 		tryExecute(env, "big topology test");
 		deleteTestTopic(topic);
@@ -1651,7 +1650,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 			}
 		});
 
-		kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+		kafkaServer.produceIntoKafka(fromGen, topic, schema, standardProps, null);
 
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
 		final JobID jobId = jobGraph.getJobID();
@@ -1939,9 +1938,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-				new KeyedSerializationSchemaWrapper<>(
-						new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+					new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 				new KafkaDeserializationSchemaWrapper<>(
@@ -2035,9 +2033,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-			new KeyedSerializationSchemaWrapper<>(
-				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 			new KafkaDeserializationSchemaWrapper<>(
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 2b586fe..59804d7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
@@ -231,7 +230,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		createTestTopic(topic, 1, 1);
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
@@ -262,7 +260,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			.addSource(new InfiniteIntegerSource())
 			.map(new BrokerRestartingMapper<>(failAfterElements));
 
-		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
 			@Override
 			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 				return partition;
@@ -273,7 +271,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			inputStream.addSink(kafkaSink.getUserFunction());
 		}
 		else {
-			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+			kafkaServer.produceIntoKafka(inputStream, topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
 				@Override
 				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 					return partition;
@@ -332,7 +330,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		}
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper<>(schema);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
@@ -359,10 +356,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			};
 
 			if (regularSink) {
-				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner);
+				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, schema, properties, partitioner);
 				inputStream.addSink(kafkaSink.getUserFunction());
 			} else {
-				kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner);
+				kafkaServer.produceIntoKafka(inputStream, topic + i, schema, properties, partitioner);
 			}
 		}
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 2eef689..816ad30 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -170,7 +169,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		Properties props = new Properties();
 		props.putAll(standardProps);
 		props.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
+		kafkaServer.produceIntoKafka(stream, topic, new SimpleStringSchema(), props, null);
 
 		// ----------- add consumer dataflow ----------
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 37132e7..0a40e92 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -155,13 +156,26 @@ public abstract class KafkaTestEnvironment {
 			int partition,
 			long timeout);
 
-	public abstract <T> StreamSink<T> getProducerSink(String topic,
-			KeyedSerializationSchema<T> serSchema, Properties props,
+	public abstract <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
 			FlinkKafkaPartitioner<T> partitioner);
 
-	public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
-														KeyedSerializationSchema<T> serSchema, Properties props,
-														FlinkKafkaPartitioner<T> partitioner);
+	@Deprecated
+	public abstract <T> DataStreamSink<T> produceIntoKafka(
+		DataStream<T> stream,
+		String topic,
+		KeyedSerializationSchema<T> serSchema,
+		Properties props,
+		FlinkKafkaPartitioner<T> partitioner);
+
+	public abstract <T> DataStreamSink<T> produceIntoKafka(
+		DataStream<T> stream,
+		String topic,
+		SerializationSchema<T> serSchema,
+		Properties props,
+		FlinkKafkaPartitioner<T> partitioner);
 
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
 			KafkaSerializationSchema<T> serSchema, Properties props) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 73d1ae1..9186ca1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -109,7 +108,7 @@ public class DataGenerators {
 
 		stream = stream.rebalance();
 		testServer.produceIntoKafka(stream, topic,
-				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
 				props,
 				new FlinkKafkaPartitioner<Integer>() {
 					@Override
@@ -154,9 +153,9 @@ public class DataGenerators {
 
 				StreamSink<String> sink = server.getProducerSink(
 						topic,
-						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+						new SimpleStringSchema(),
 						producerProperties,
-						new FlinkFixedPartitioner<String>());
+						new FlinkFixedPartitioner<>());
 
 				OneInputStreamOperatorTestHarness<String, Object> testHarness =
 						new OneInputStreamOperatorTestHarness<>(sink);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3bda1fe..c2193e5 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
 import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -378,9 +378,9 @@ public class FlinkKafkaProducer<IN>
 			int kafkaProducersPoolSize) {
 		this(
 			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
-			customPartitioner,
 			null,
+			null,
+			new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),
 			producerConfig,
 			semantic,
 			kafkaProducersPoolSize
@@ -471,10 +471,10 @@ public class FlinkKafkaProducer<IN>
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(
-		String topicId,
-		KeyedSerializationSchema<IN> serializationSchema,
-		Properties producerConfig,
-		FlinkKafkaProducer.Semantic semantic) {
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			FlinkKafkaProducer.Semantic semantic) {
 		this(topicId,
 			serializationSchema,
 			producerConfig,
@@ -728,6 +728,9 @@ public class FlinkKafkaProducer<IN>
 	 */
 	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
 		this.writeTimestampToKafka = writeTimestampToKafka;
+		if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
+			((KafkaSerializationSchemaWrapper<IN>) kafkaSchema).setWriteTimestamp(writeTimestampToKafka);
+		}
 	}
 
 	/**
@@ -789,21 +792,12 @@ public class FlinkKafkaProducer<IN>
 		}
 
 		if (kafkaSchema != null) {
-			kafkaSchema.open(createSerializationInitContext());
-		}
-
-		if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) {
-			((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema()
-				.open(createSerializationInitContext());
+			kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
 		}
 
 		super.open(configuration);
 	}
 
-	private SerializationSchema.InitializationContext createSerializationInitContext() {
-		return () -> getRuntimeContext().getMetricGroup().addGroup("user");
-	}
-
 	@Override
 	public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
 		checkErroneous();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..a166c0e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenSerializationSchemaProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	@Test
+	public void testOpenKafkaSerializationSchemaProducer() throws Exception {
+		OpenTestingKafkaSerializationSchema schema = new OpenTestingKafkaSerializationSchema();
+		Properties properties = new Properties();
+		properties.put("bootstrap.servers", "localhost:9092");
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			"test-topic",
+			schema,
+			properties,
+			FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingKafkaSerializationSchema implements KafkaSerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(SerializationSchema.InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public ProducerRecord<byte[], byte[]> serialize(
+			Integer element,
+			@Nullable Long timestamp) {
+			return null;
+		}
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(SerializationSchema.InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 16cb724..f2c1767 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -261,12 +262,16 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
-		return new StreamSink<>(new FlinkKafkaProducer<T>(
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return new StreamSink<>(new FlinkKafkaProducer<>(
 			topic,
 			serSchema,
 			props,
-			Optional.ofNullable(partitioner),
+			partitioner,
 			producerSemantic,
 			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
 	}
@@ -283,6 +288,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer<T>(
+			topic,
+			serSchema,
+			props,
+			partitioner,
+			producerSemantic,
+			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KafkaSerializationSchema<T> serSchema, Properties props) {
 		return stream.addSink(new FlinkKafkaProducer<T>(
 				topic,
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 06c73d5..17fbe48 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.table.descriptors.KafkaValidator;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
@@ -69,7 +68,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 		// ---------- Write the Debezium json into Kafka -------------------
 		List<String> lines = readLines("debezium-data-schema-exclude.txt");
 		DataStreamSource<String> stream = env.fromCollection(lines);
-		KeyedSerializationSchema<String> serSchema = new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema());
+		SerializationSchema<String> serSchema = new SimpleStringSchema();
 		FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
 
 		// the producer must not produce duplicates