You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 14:03:13 UTC
[flink] 02/02: [FLINK-18075] Wrap the SerializationSchema in
KafkaSerializationSchema in Kafka connector
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 74fd2912d146cc2db103825f0aee2282a99b5774
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jun 4 13:17:27 2020 +0200
[FLINK-18075] Wrap the SerializationSchema in KafkaSerializationSchema in Kafka connector
---
.../connectors/kafka/FlinkKafkaProducerTest.java | 72 +++++++++++++
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 19 +++-
.../connectors/kafka/FlinkKafkaProducerTest.java | 72 +++++++++++++
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 33 +++++-
.../internals/KafkaSerializationSchemaWrapper.java | 95 +++++++++++++++++
.../connectors/kafka/KafkaConsumerTestBase.java | 23 ++--
.../connectors/kafka/KafkaProducerTestBase.java | 11 +-
.../kafka/KafkaShortRetentionTestBase.java | 3 +-
.../connectors/kafka/KafkaTestEnvironment.java | 24 ++++-
.../connectors/kafka/testutils/DataGenerators.java | 7 +-
.../connectors/kafka/FlinkKafkaProducer.java | 28 ++---
.../connectors/kafka/FlinkKafkaProducerTest.java | 118 +++++++++++++++++++++
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 27 ++++-
.../connectors/kafka/table/KafkaTableITCase.java | 5 +-
14 files changed, 479 insertions(+), 58 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..2bd54f1
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer010}.
+ */
+public class FlinkKafkaProducerTest {
+ @Test
+ public void testOpenProducer() throws Exception {
+
+ OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+ FlinkKafkaProducer010<Integer> kafkaProducer = new FlinkKafkaProducer010<>(
+ "localhost:9092",
+ "test-topic",
+ schema
+ );
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ 1,
+ 1,
+ 0,
+ IntSerializer.INSTANCE,
+ new OperatorID(1, 1));
+
+ testHarness.open();
+
+ assertThat(schema.openCalled, equalTo(true));
+ }
+
+ private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+ private boolean openCalled;
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public byte[] serialize(Integer element) {
+ return new byte[0];
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 9ae751b..33abb05 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -154,7 +155,11 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+ public <T> StreamSink<T> getProducerSink(
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
prod.setFlushOnCheckpoint(true);
return new StreamSink<>(prod);
@@ -168,6 +173,18 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
+ FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+ prod.setFlushOnCheckpoint(true);
+ return stream.addSink(prod);
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..6001245
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer011}.
+ */
+public class FlinkKafkaProducerTest {
+ @Test
+ public void testOpenProducer() throws Exception {
+
+ OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+ FlinkKafkaProducer011<Integer> kafkaProducer = new FlinkKafkaProducer011<>(
+ "localhost:9092",
+ "test-topic",
+ schema
+ );
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ 1,
+ 1,
+ 0,
+ IntSerializer.INSTANCE,
+ new OperatorID(1, 1));
+
+ testHarness.open();
+
+ assertThat(schema.openCalled, equalTo(true));
+ }
+
+ private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+ private boolean openCalled;
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public byte[] serialize(Integer element) {
+ return new byte[0];
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c846f16..db82233 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,10 +17,12 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -267,10 +269,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+ public <T> StreamSink<T> getProducerSink(
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
return new StreamSink<>(new FlinkKafkaProducer011<>(
topic,
- serSchema,
+ new KeyedSerializationSchemaWrapper<>(serSchema),
props,
Optional.ofNullable(partitioner),
producerSemantic,
@@ -278,7 +284,12 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
+ public <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
return stream.addSink(new FlinkKafkaProducer011<>(
topic,
serSchema,
@@ -289,6 +300,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
+ return stream.addSink(new FlinkKafkaProducer011<>(
+ topic,
+ new KeyedSerializationSchemaWrapper<>(serSchema),
+ props,
+ Optional.ofNullable(partitioner),
+ producerSemantic,
+ FlinkKafkaProducer011.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+ }
+
+ @Override
public KafkaOffsetHandler createOffsetHandler() {
return new KafkaOffsetHandlerImpl();
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
new file mode 100644
index 0000000..3cd0fc7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaSerializationSchemaWrapper.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * An adapter from old style interfaces such as {@link org.apache.flink.api.common.serialization.SerializationSchema},
+ * {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} to the
+ * {@link KafkaSerializationSchema}.
+ */
+public class KafkaSerializationSchemaWrapper<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
+
+ private final FlinkKafkaPartitioner<T> partitioner;
+ private final SerializationSchema<T> serializationSchema;
+ private final String topic;
+ private boolean writeTimestamp;
+
+ private int[] partitions;
+
+ public KafkaSerializationSchemaWrapper(
+ String topic,
+ FlinkKafkaPartitioner<T> partitioner,
+ boolean writeTimestamp,
+ SerializationSchema<T> serializationSchema) {
+ this.partitioner = partitioner;
+ this.serializationSchema = serializationSchema;
+ this.topic = topic;
+ this.writeTimestamp = writeTimestamp;
+ }
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ serializationSchema.open(context);
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ T element,
+ @Nullable Long timestamp) {
+ byte[] serialized = serializationSchema.serialize(element);
+ final Integer partition;
+ if (partitioner != null) {
+ partition = partitioner.partition(element, null, serialized, topic, partitions);
+ } else {
+ partition = null;
+ }
+
+ final Long timestampToWrite;
+ if (writeTimestamp) {
+ timestampToWrite = timestamp;
+ } else {
+ timestampToWrite = null;
+ }
+
+ return new ProducerRecord<>(topic, partition, timestampToWrite, null, serialized);
+ }
+
+ @Override
+ public String getTargetTopic(T element) {
+ return topic;
+ }
+
+ @Override
+ public void setPartitions(int[] partitions) {
+ this.partitions = partitions;
+ }
+
+ public void setWriteTimestamp(boolean writeTimestamp) {
+ this.writeTimestamp = writeTimestamp;
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index d99f8d9..8406f70 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -64,7 +65,6 @@ import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
@@ -418,9 +418,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
- final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
- new KeyedSerializationSchemaWrapper<>(
- new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+ final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+ new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
@@ -747,7 +746,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
producerProperties.setProperty("retries", "3");
producerProperties.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(sinkSchema), producerProperties, null);
+ kafkaServer.produceIntoKafka(stream, topic, sinkSchema, producerProperties, null);
// ----------- add consumer dataflow ----------
@@ -1302,7 +1301,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
});
- kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(serSchema), producerProps, null);
+ kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProps, null);
tryExecute(env, "big topology test");
deleteTestTopic(topic);
@@ -1651,7 +1650,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
});
- kafkaServer.produceIntoKafka(fromGen, topic, new KeyedSerializationSchemaWrapper<>(schema), standardProps, null);
+ kafkaServer.produceIntoKafka(fromGen, topic, schema, standardProps, null);
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env1.getStreamGraph());
final JobID jobId = jobGraph.getJobID();
@@ -1939,9 +1938,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
- final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
- new KeyedSerializationSchemaWrapper<>(
- new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+ final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+ new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
@@ -2035,9 +2033,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
final TypeInformation<Tuple2<Integer, Integer>> resultType =
TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {});
- final KeyedSerializationSchema<Tuple2<Integer, Integer>> serSchema =
- new KeyedSerializationSchemaWrapper<>(
- new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+ final SerializationSchema<Tuple2<Integer, Integer>> serSchema =
+ new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig());
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 2b586fe..59804d7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationS
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
@@ -231,7 +230,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
createTestTopic(topic, 1, 1);
TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
@@ -262,7 +260,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
.addSource(new InfiniteIntegerSource())
.map(new BrokerRestartingMapper<>(failAfterElements));
- StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
@Override
public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partition;
@@ -273,7 +271,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
inputStream.addSink(kafkaSink.getUserFunction());
}
else {
- kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
+ kafkaServer.produceIntoKafka(inputStream, topic, schema, properties, new FlinkKafkaPartitioner<Integer>() {
@Override
public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return partition;
@@ -332,7 +330,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
}
TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
- KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper<>(schema);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
@@ -359,10 +356,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
};
if (regularSink) {
- StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner);
+ StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic + i, schema, properties, partitioner);
inputStream.addSink(kafkaSink.getUserFunction());
} else {
- kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner);
+ kafkaServer.produceIntoKafka(inputStream, topic + i, schema, properties, partitioner);
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 2eef689..816ad30 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.InstantiationUtil;
@@ -170,7 +169,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);
- kafkaServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, null);
+ kafkaServer.produceIntoKafka(stream, topic, new SimpleStringSchema(), props, null);
// ----------- add consumer dataflow ----------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 37132e7..0a40e92 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -155,13 +156,26 @@ public abstract class KafkaTestEnvironment {
int partition,
long timeout);
- public abstract <T> StreamSink<T> getProducerSink(String topic,
- KeyedSerializationSchema<T> serSchema, Properties props,
+ public abstract <T> StreamSink<T> getProducerSink(
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
FlinkKafkaPartitioner<T> partitioner);
- public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
- KeyedSerializationSchema<T> serSchema, Properties props,
- FlinkKafkaPartitioner<T> partitioner);
+ @Deprecated
+ public abstract <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ KeyedSerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner);
+
+ public abstract <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner);
public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
KafkaSerializationSchema<T> serSchema, Properties props) {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 73d1ae1..9186ca1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -109,7 +108,7 @@ public class DataGenerators {
stream = stream.rebalance();
testServer.produceIntoKafka(stream, topic,
- new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())),
+ new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig()),
props,
new FlinkKafkaPartitioner<Integer>() {
@Override
@@ -154,9 +153,9 @@ public class DataGenerators {
StreamSink<String> sink = server.getProducerSink(
topic,
- new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+ new SimpleStringSchema(),
producerProperties,
- new FlinkFixedPartitioner<String>());
+ new FlinkFixedPartitioner<>());
OneInputStreamOperatorTestHarness<String, Object> testHarness =
new OneInputStreamOperatorTestHarness<>(sink);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3bda1fe..c2193e5 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -42,7 +42,7 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer;
import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -378,9 +378,9 @@ public class FlinkKafkaProducer<IN>
int kafkaProducersPoolSize) {
this(
topicId,
- new KeyedSerializationSchemaWrapper<>(serializationSchema),
- customPartitioner,
null,
+ null,
+ new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),
producerConfig,
semantic,
kafkaProducersPoolSize
@@ -471,10 +471,10 @@ public class FlinkKafkaProducer<IN>
*/
@Deprecated
public FlinkKafkaProducer(
- String topicId,
- KeyedSerializationSchema<IN> serializationSchema,
- Properties producerConfig,
- FlinkKafkaProducer.Semantic semantic) {
+ String topicId,
+ KeyedSerializationSchema<IN> serializationSchema,
+ Properties producerConfig,
+ FlinkKafkaProducer.Semantic semantic) {
this(topicId,
serializationSchema,
producerConfig,
@@ -728,6 +728,9 @@ public class FlinkKafkaProducer<IN>
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.writeTimestampToKafka = writeTimestampToKafka;
+ if (kafkaSchema instanceof KafkaSerializationSchemaWrapper) {
+ ((KafkaSerializationSchemaWrapper<IN>) kafkaSchema).setWriteTimestamp(writeTimestampToKafka);
+ }
}
/**
@@ -789,21 +792,12 @@ public class FlinkKafkaProducer<IN>
}
if (kafkaSchema != null) {
- kafkaSchema.open(createSerializationInitContext());
- }
-
- if (keyedSchema != null && keyedSchema instanceof KeyedSerializationSchemaWrapper) {
- ((KeyedSerializationSchemaWrapper<IN>) keyedSchema).getSerializationSchema()
- .open(createSerializationInitContext());
+ kafkaSchema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
}
super.open(configuration);
}
- private SerializationSchema.InitializationContext createSerializationInitContext() {
- return () -> getRuntimeContext().getMetricGroup().addGroup("user");
- }
-
@Override
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
checkErroneous();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
new file mode 100644
index 0000000..a166c0e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link FlinkKafkaProducer}.
+ */
+public class FlinkKafkaProducerTest {
+ @Test
+ public void testOpenSerializationSchemaProducer() throws Exception {
+
+ OpenTestingSerializationSchema schema = new OpenTestingSerializationSchema();
+ FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+ "localhost:9092",
+ "test-topic",
+ schema
+ );
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ 1,
+ 1,
+ 0,
+ IntSerializer.INSTANCE,
+ new OperatorID(1, 1));
+
+ testHarness.open();
+
+ assertThat(schema.openCalled, equalTo(true));
+ }
+
+ @Test
+ public void testOpenKafkaSerializationSchemaProducer() throws Exception {
+ OpenTestingKafkaSerializationSchema schema = new OpenTestingKafkaSerializationSchema();
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "localhost:9092");
+ FlinkKafkaProducer<Integer> kafkaProducer = new FlinkKafkaProducer<>(
+ "test-topic",
+ schema,
+ properties,
+ FlinkKafkaProducer.Semantic.AT_LEAST_ONCE
+ );
+
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness = new OneInputStreamOperatorTestHarness<>(
+ new StreamSink<>(kafkaProducer),
+ 1,
+ 1,
+ 0,
+ IntSerializer.INSTANCE,
+ new OperatorID(1, 1));
+
+ testHarness.open();
+
+ assertThat(schema.openCalled, equalTo(true));
+ }
+
+ private static class OpenTestingKafkaSerializationSchema implements KafkaSerializationSchema<Integer> {
+ private boolean openCalled;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ Integer element,
+ @Nullable Long timestamp) {
+ return null;
+ }
+ }
+
+ private static class OpenTestingSerializationSchema implements SerializationSchema<Integer> {
+ private boolean openCalled;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public byte[] serialize(Integer element) {
+ return new byte[0];
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 16cb724..f2c1767 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kafka;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.networking.NetworkFailuresProxy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -261,12 +262,16 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
- public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
- return new StreamSink<>(new FlinkKafkaProducer<T>(
+ public <T> StreamSink<T> getProducerSink(
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
+ return new StreamSink<>(new FlinkKafkaProducer<>(
topic,
serSchema,
props,
- Optional.ofNullable(partitioner),
+ partitioner,
producerSemantic,
FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
}
@@ -283,6 +288,22 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
+ public <T> DataStreamSink<T> produceIntoKafka(
+ DataStream<T> stream,
+ String topic,
+ SerializationSchema<T> serSchema,
+ Properties props,
+ FlinkKafkaPartitioner<T> partitioner) {
+ return stream.addSink(new FlinkKafkaProducer<T>(
+ topic,
+ serSchema,
+ props,
+ partitioner,
+ producerSemantic,
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE));
+ }
+
+ @Override
public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KafkaSerializationSchema<T> serSchema, Properties props) {
return stream.addSink(new FlinkKafkaProducer<T>(
topic,
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 06c73d5..17fbe48 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,13 +18,12 @@
package org.apache.flink.streaming.connectors.kafka.table;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.table.descriptors.KafkaValidator;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
@@ -69,7 +68,7 @@ public class KafkaTableITCase extends KafkaTableTestBase {
// ---------- Write the Debezium json into Kafka -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
DataStreamSource<String> stream = env.fromCollection(lines);
- KeyedSerializationSchema<String> serSchema = new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema());
+ SerializationSchema<String> serSchema = new SimpleStringSchema();
FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>();
// the producer must not produce duplicates