You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 18:04:31 UTC

[flink] branch master updated: [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 87d6a76  [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
87d6a76 is described below

commit 87d6a76923511f6b47ea2d5d2bba365e21cd3b96
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jun 8 20:02:24 2020 +0200

    [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
---
 .../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +--
 .../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java    | 3 +--
 .../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java    | 3 +--
 .../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java       | 3 +--
 4 files changed, 4 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index cb42bd7..8abc2a2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
@@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase {
 			Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer011<>(
 			topic,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			serializationSchema,
 			properties,
 			partitioner);
 	}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index 110dd9a..d632928 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
 		return new FlinkKafkaProducer011<>(
 				topic,
-				new KeyedSerializationSchemaWrapper<>(serializationSchema),
+				serializationSchema,
 				properties,
 				partitioner);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 50e4381..861e5d7 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
@@ -52,7 +51,7 @@ public class KafkaTableSink extends KafkaTableSinkBase {
 		Optional<FlinkKafkaPartitioner<Row>> partitioner) {
 		return new FlinkKafkaProducer<>(
 			topic,
-			new KeyedSerializationSchemaWrapper<>(serializationSchema),
+			serializationSchema,
 			properties,
 			partitioner);
 	}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index f3d6c1e..aea837f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -59,7 +58,7 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
 		return new FlinkKafkaProducer<>(
 				topic,
-				new KeyedSerializationSchemaWrapper<>(serializationSchema),
+				serializationSchema,
 				properties,
 				partitioner);
 	}