You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/08 18:05:09 UTC
[flink] branch release-1.11 updated: [FLINK-18075][hotfix] Use
DeserializationSchema instead of KeyedDeserializationSchema in
DynamicTableSink
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f323a79 [FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
f323a79 is described below
commit f323a793b9776c56178446ee08445aaf99f8231c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jun 8 20:02:24 2020 +0200
[FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeserializationSchema in DynamicTableSink
---
.../org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java | 3 +--
.../flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java | 3 +--
.../org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java | 3 +--
.../flink/streaming/connectors/kafka/table/KafkaDynamicSink.java | 3 +--
4 files changed, 4 insertions(+), 8 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index cb42bd7..8abc2a2 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
@@ -57,7 +56,7 @@ public class Kafka011TableSink extends KafkaTableSinkBase {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
- new KeyedSerializationSchemaWrapper<>(serializationSchema),
+ serializationSchema,
properties,
partitioner);
}
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index 110dd9a..d632928 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -60,7 +59,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
- new KeyedSerializationSchemaWrapper<>(serializationSchema),
+ serializationSchema,
properties,
partitioner);
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 50e4381..861e5d7 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
@@ -52,7 +51,7 @@ public class KafkaTableSink extends KafkaTableSinkBase {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
- new KeyedSerializationSchemaWrapper<>(serializationSchema),
+ serializationSchema,
properties,
partitioner);
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index f3d6c1e..aea837f 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -59,7 +58,7 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
- new KeyedSerializationSchemaWrapper<>(serializationSchema),
+ serializationSchema,
properties,
partitioner);
}