You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 14:03:11 UTC

[flink] branch master updated (0358292 -> 74fd291)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 0358292  [FLINK-16572][e2e][pubsub] Acknowledge message in previous test
     new bebe503  [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema
     new 74fd291  [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +++++-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java    |  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java    |  11 +-
 .../kafka/KafkaShortRetentionTestBase.java         |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java     |  24 ++++-
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java       | 108 +++++++++++--------
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +++++++++++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 ++++-
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 534 insertions(+), 83 deletions(-)
 create mode 100644 flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
 create mode 100644 flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
 create mode 100644 flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java


[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 74fd2912d146cc2db103825f0aee2282a99b5774
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 4 13:17:27 2020 +0200

    [FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector
---
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  19 +++-
 .../connectors/kafka/FlinkKafkaProducerTest.java   |  72 +++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  33 +++++-
 .../internals/KafkaSerializationSchemaWrapper.java |  95 +++++++++++++++++
 .../connectors/kafka/KafkaConsumerTestBase.java    |  23 ++--
 .../connectors/kafka/KafkaProducerTestBase.java    |  11 +-
 .../kafka/KafkaShortRetentionTestBase.java         |   3 +-
 .../connectors/kafka/KafkaTestEnvironment.java     |  24 ++++-
 .../connectors/kafka/testutils/DataGenerators.java |   7 +-
 .../connectors/kafka/FlinkKafkaProducer.java       |  28 ++---
 .../connectors/kafka/FlinkKafkaProducerTest.java   | 118 +++++++++++++++++++++
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  27 ++++-
 .../connectors/kafka/table/KafkaTableITCase.java   |   5 +-
 14 files changed, 479 insertions(+), 58 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..2bd54f1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer010}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer010<Integer> kafkaProducer = new FlinkKafkaProducer010<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9ae751b..33abb05 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -154,7 +155,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);
 		return new StreamSink<>(prod);
@@ -168,6 +173,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return stream.addSink(prod);
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..6001245
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer011}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c846f16..db82233 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,10 +17,12 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -267,10 +269,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		return new StreamSink<>(new FlinkKafkaProducer011<>(
 			topic,
-			serSchema,
+			new KeyedSerializationSchemaWrapper<>(serSchema),
 			props,
 			Optional.ofNullable(partitioner),
 			producerSemantic,
@@ -278,7 +284,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
 		return stream.addSink(new FlinkKafkaProducer011<>(
 			topic,
 			serSchema,
@@ -289,6 +300,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer011<>(
+			topic,
+			new KeyedSerializationSchemaWrapper<>(serSchema),
+			props,
+			Optional.ofNullable(partitioner),
+			producerSemantic,
+			FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
 	public KafkaOffsetHandler createOffsetHandler() {
 		return new KafkaOffsetHandlerImpl();
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
new file mode 100644
index 0000000..3cd0fc7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * An adapter from old style interfaces such as {@link org.apache.flink.api.common.serialization.SerializationSchema},
+ * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
+ * {@link KafkaSerializationSchema}.
+ */
+public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
+
+	private final FlinkKafkaPartitioner<T> partitioner;
+	private final SerializationSchema<T> serializationSchema;
+	private final String topic;
+	private boolean writeTimestamp;
+
+	private int[] partitions;
+
+	public KafkaSerializationSchemaWrapper(
+			String topic,
+			FlinkKafkaPartitioner<T> partitioner,
+			boolean writeTimestamp,
+			SerializationSchema<T> serializationSchema) {
+		this.partitioner = partitioner;
+		this.serializationSchema = serializationSchema;
+		this.topic = topic;
+		this.writeTimestamp = writeTimestamp;
+	}
+
+	@Override
+	public void open(SerializationSchema.InitializationContext context) throws Exception {
+		serializationSchema.open(context);
+	}
+
+	@Override
+	public ProducerRecord<byte[], byte[]> serialize(
+			T element,
+			@Nullable Long timestamp) {
+		byte[] serialized = serializationSchema.serialize(element);
+		final Integer partition;
+		if (partitioner != null) {
+			partition = partitioner.partition(element, null, serialized, topic, partitions);
+		} else {
+			partition = null;
+		}
+
+		final Long timestampToWrite;
+		if (writeTimestamp) {
+			timestampToWrite = timestamp;
+		} else {
+			timestampToWrite = null;
+		}
+
+		return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
+	}
+
+	@Override
+	public String getTargetTopic(T element) {
+		return topic;
+	}
+
+	@Override
+	public void setPartitions(int[] partitions) {
+		this.partitions = partitions;
+	}
+
+	public void setWriteTimestamp(boolean writeTimestamp) {
+		this.writeTimestamp = writeTimestamp;
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index d99f8d9..8406f70 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -64,7 +65,6 @@ import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
@@ -418,9 +418,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-			new KeyedSerializationSchemaWrapper<>(
-				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 			new KafkaDeserializationSchemaWrapper<>(
@@ -747,7 +746,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
 		producerProperties.setProperty("retries", "3");
 		producerProperties.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+		kafkaServer.produceIntoKafka(stream, topic, sinkSchema, producerProperties, null);
 
 		// ----------- add consumer dataflow ----------
 
@@ -1302,7 +1301,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 			}
 		});
 
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
+		kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProps, null);
 
 		tryExecute(env, "big topology test");
 		deleteTestTopic(topic);
@@ -1651,7 +1650,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 			}
 		});
 
-		kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+		kafkaServer.produceIntoKafka(fromGen, topic, schema, standardProps, null);
 
 		JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
 		final JobID jobId = jobGraph.getJobID();
@@ -1939,9 +1938,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 				TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-				new KeyedSerializationSchemaWrapper<>(
-						new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+					new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 				new KafkaDeserializationSchemaWrapper<>(
@@ -2035,9 +2033,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
 		final TypeInformation<Tuple2<Integer, Integer>> resultType =
 			TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
 
-		final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
-			new KeyedSerializationSchemaWrapper<>(
-				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+		final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+				new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
 
 		final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
 			new KafkaDeserializationSchemaWrapper<>(
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 2b586fe..59804d7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.Preconditions;
@@ -231,7 +230,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		createTestTopic(topic, 1, 1);
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
@@ -262,7 +260,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			.addSource(new InfiniteIntegerSource())
 			.map(new BrokerRestartingMapper<>(failAfterElements));
 
-		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
 			@Override
 			public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 				return partition;
@@ -273,7 +271,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			inputStream.addSink(kafkaSink.getUserFunction());
 		}
 		else {
-			kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+			kafkaServer.produceIntoKafka(inputStream, topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
 				@Override
 				public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
 					return partition;
@@ -332,7 +330,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		}
 
 		TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
-		KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper<>(schema);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
@@ -359,10 +356,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			};
 
 			if (regularSink) {
-				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner);
+				StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, schema, properties, partitioner);
 				inputStream.addSink(kafkaSink.getUserFunction());
 			} else {
-				kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner);
+				kafkaServer.produceIntoKafka(inputStream, topic + i, schema, properties, partitioner);
 			}
 		}
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 2eef689..816ad30 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -170,7 +169,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		Properties props = new Properties();
 		props.putAll(standardProps);
 		props.putAll(secureProps);
-		kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
+		kafkaServer.produceIntoKafka(stream, topic, new SimpleStringSchema(), props, null);
 
 		// ----------- add consumer dataflow ----------
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 37132e7..0a40e92 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -155,13 +156,26 @@ public abstract class KafkaTestEnvironment {
 			int partition,
 			long timeout);
 
-	public abstract <T> StreamSink<T> getProducerSink(String topic,
-			KeyedSerializationSchema<T> serSchema, Properties props,
+	public abstract <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
 			FlinkKafkaPartitioner<T> partitioner);
 
-	public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
-														KeyedSerializationSchema<T> serSchema, Properties props,
-														FlinkKafkaPartitioner<T> partitioner);
+	@Deprecated
+	public abstract <T> DataStreamSink<T> produceIntoKafka(
+		DataStream<T> stream,
+		String topic,
+		KeyedSerializationSchema<T> serSchema,
+		Properties props,
+		FlinkKafkaPartitioner<T> partitioner);
+
+	public abstract <T> DataStreamSink<T> produceIntoKafka(
+		DataStream<T> stream,
+		String topic,
+		SerializationSchema<T> serSchema,
+		Properties props,
+		FlinkKafkaPartitioner<T> partitioner);
 
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
 			KafkaSerializationSchema<T> serSchema, Properties props) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 73d1ae1..9186ca1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -109,7 +108,7 @@ public class DataGenerators {
 
 		stream = stream.rebalance();
 		testServer.produceIntoKafka(stream, topic,
-				new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
+				new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
 				props,
 				new FlinkKafkaPartitioner<Integer>() {
 					@Override
@@ -154,9 +153,9 @@ public class DataGenerators {
 
 				StreamSink<String> sink = server.getProducerSink(
 						topic,
-						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+						new SimpleStringSchema(),
 						producerProperties,
-						new FlinkFixedPartitioner<String>());
+						new FlinkFixedPartitioner<>());
 
 				OneInputStreamOperatorTestHarness<String, Object> testHarness =
 						new OneInputStreamOperatorTestHarness<>(sink);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3bda1fe..c2193e5 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer;
 import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
 import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -378,9 +378,9 @@ public class FlinkKafkaProducer<IN>
 			int kafkaProducersPoolSize) {
 		this(
 			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
-			customPartitioner,
 			null,
+			null,
+			new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),
 			producerConfig,
 			semantic,
 			kafkaProducersPoolSize
@@ -471,10 +471,10 @@ public class FlinkKafkaProducer<IN>
 	 */
 	@Deprecated
 	public FlinkKafkaProducer(
-		String topicId,
-		KeyedSerializationSchema<IN> serializationSchema,
-		Properties producerConfig,
-		FlinkKafkaProducer.Semantic semantic) {
+			String topicId,
+			KeyedSerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			FlinkKafkaProducer.Semantic semantic) {
 		this(topicId,
 			serializationSchema,
 			producerConfig,
@@ -728,6 +728,9 @@ public class FlinkKafkaProducer<IN>
 	 */
 	public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
 		this.writeTimestampToKafka = writeTimestampToKafka;
+		if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
+			((KafkaSerializationSchemaWrapper<IN>) kafkaSchema).setWriteTimestamp(writeTimestampToKafka);
+		}
 	}
 
 	/**
@@ -789,21 +792,12 @@ public class FlinkKafkaProducer<IN>
 		}
 
 		if (kafkaSchema != null) {
-			kafkaSchema.open(createSerializationInitContext());
-		}
-
-		if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) {
-			((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema()
-				.open(createSerializationInitContext());
+			kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
 		}
 
 		super.open(configuration);
 	}
 
-	private SerializationSchema.InitializationContext createSerializationInitContext() {
-		return () -> getRuntimeContext().getMetricGroup().addGroup("user");
-	}
-
 	@Override
 	public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
 		checkErroneous();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..a166c0e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer}.
+ */
+public class FlinkKafkaProducerTest {
+	@Test
+	public void testOpenSerializationSchemaProducer() throws Exception {
+
+		OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			"localhost:9092",
+			"test-topic",
+			schema
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	@Test
+	public void testOpenKafkaSerializationSchemaProducer() throws Exception {
+		OpenTestingKafkaSerializationSchema schema = new OpenTestingKafkaSerializationSchema();
+		Properties properties = new Properties();
+		properties.put("bootstrap.servers", "localhost:9092");
+		FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+			"test-topic",
+			schema,
+			properties,
+			FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
+		);
+
+		OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+			new StreamSink<>(kafkaProducer),
+			1,
+			1,
+			0,
+			IntSerializer.INSTANCE,
+			new OperatorID(1, 1));
+
+		testHarness.open();
+
+		assertThat(schema.openCalled, equalTo(true));
+	}
+
+	private static class OpenTestingKafkaSerializationSchema implements KafkaSerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(SerializationSchema.InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public ProducerRecord<byte[], byte[]> serialize(
+			Integer element,
+			@Nullable Long timestamp) {
+			return null;
+		}
+	}
+
+	private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+		private boolean openCalled;
+
+		@Override
+		public void open(SerializationSchema.InitializationContext context) throws Exception {
+			openCalled = true;
+		}
+
+		@Override
+		public byte[] serialize(Integer element) {
+			return new byte[0];
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 16cb724..f2c1767 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.networking.NetworkFailuresProxy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -261,12 +262,16 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
-	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
-		return new StreamSink<>(new FlinkKafkaProducer<T>(
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return new StreamSink<>(new FlinkKafkaProducer<>(
 			topic,
 			serSchema,
 			props,
-			Optional.ofNullable(partitioner),
+			partitioner,
 			producerSemantic,
 			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
 	}
@@ -283,6 +288,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> DataStreamSink<T> produceIntoKafka(
+			DataStream<T> stream,
+			String topic,
+			SerializationSchema<T> serSchema,
+			Properties props,
+			FlinkKafkaPartitioner<T> partitioner) {
+		return stream.addSink(new FlinkKafkaProducer<T>(
+			topic,
+			serSchema,
+			props,
+			partitioner,
+			producerSemantic,
+			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+	}
+
+	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KafkaSerializationSchema<T> serSchema, Properties props) {
 		return stream.addSink(new FlinkKafkaProducer<T>(
 				topic,
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 06c73d5..17fbe48 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.table.descriptors.KafkaValidator;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
@@ -69,7 +68,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
 		// ---------- Write the Debezium json into Kafka -------------------
 		List<String> lines = readLines("debezium-data-schema-exclude.txt");
 		DataStreamSource<String> stream = env.fromCollection(lines);
-		KeyedSerializationSchema<String> serSchema = new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema());
+		SerializationSchema<String> serSchema = new SimpleStringSchema();
 		FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
 
 		// the producer must not produce duplicates


[flink] 01/02: [FLINK-18075] Remove deprecation of Kafka producer ctor that take SerializationSchema

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bebe50346d9485fa2d962c5ed1d00da2778c8feb
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 4 13:22:56 2020 +0200

    [FLINK-18075] Remove deprecation of Kafka producer ctor that take
    SerializationSchema
    
    SerializationSchema is an important interface that is widely spread and
    used in other components such as Table API. It is also the most common
    interface for reusable interfaces. Therefore we should support it long
    term in our connectors. This commit removes the deprecation of ctors
    that take this interface.
    
    Moreover it adds the most general ctor that takes all producer
    configuration options along with SerializationSchema. This makes it
    feature equivalent with KafkaSerializationSchema in respect to
    configuration of the producer.
---
 .../connectors/kafka/FlinkKafkaProducer.java       | 84 +++++++++++++++-------
 1 file changed, 57 insertions(+), 27 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 25b359c..3bda1fe 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -288,16 +288,9 @@ public class FlinkKafkaProducer<IN>
 	 * 			ID of the Kafka topic.
 	 * @param serializationSchema
 	 * 			User defined (keyless) serialization schema.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
-		this(
-			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
-			getPropertiesFromBrokerList(brokerList),
-			Optional.of(new FlinkFixedPartitioner<IN>()));
+		this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
 	}
 
 	/**
@@ -318,16 +311,41 @@ public class FlinkKafkaProducer<IN>
 	 * 			User defined key-less serialization schema.
 	 * @param producerConfig
 	 * 			Properties with the producer configuration.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
+		this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>()));
+	}
+
+	/**
+	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
+	 * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
+	 *
+	 * <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
+	 * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
+	 * partitions in a round-robin fashion.
+	 *
+	 * @param topicId
+	 *          The topic to write data to
+	 * @param serializationSchema
+	 *          A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig
+	 *          Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner
+	 *          A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
+	 *          provided, records will be distributed to Kafka partitions in a round-robin fashion.
+	 */
+	public FlinkKafkaProducer(
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
 		this(
 			topicId,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			serializationSchema,
 			producerConfig,
-			Optional.of(new FlinkFixedPartitioner<IN>()));
+			customPartitioner.orElse(null),
+			Semantic.AT_LEAST_ONCE,
+			DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
 
 	/**
@@ -339,22 +357,34 @@ public class FlinkKafkaProducer<IN>
 	 * partitions in a round-robin fashion.
 	 *
 	 * @param topicId The topic to write data to
-	 * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
-	 * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
-	 * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
-	 *                          If a partitioner is not provided, records will be distributed to Kafka partitions
-	 *                          in a round-robin fashion.
-	 *
-	 * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
+	 * @param serializationSchema
+	 *          A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
+	 * @param producerConfig
+	 *          Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
+	 * @param customPartitioner
+	 *          A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
+	 *          provided, records will be distributed to Kafka partitions in a round-robin fashion.
+	 * @param semantic
+	 *          Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}).
+	 * @param kafkaProducersPoolSize
+	 *          Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
 	 */
-	@Deprecated
 	public FlinkKafkaProducer(
-		String topicId,
-		SerializationSchema<IN> serializationSchema,
-		Properties producerConfig,
-		Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
-
-		this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
+			String topicId,
+			SerializationSchema<IN> serializationSchema,
+			Properties producerConfig,
+			@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
+			FlinkKafkaProducer.Semantic semantic,
+			int kafkaProducersPoolSize) {
+		this(
+			topicId,
+			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			customPartitioner,
+			null,
+			producerConfig,
+			semantic,
+			kafkaProducersPoolSize
+		);
 	}
 
 	// ------------------- Key/Value serialization schema constructors ----------------------