You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hx...@apache.org on 2022/07/27 10:54:38 UTC

[flink] branch master updated: [FLINK-28510][python][connector] Support using new KafkaSink

This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c796a78671c [FLINK-28510][python][connector] Support using new KafkaSink
c796a78671c is described below

commit c796a78671c8f201be4a298c7a7b934c70ea4398
Author: Juntao Hu <ma...@gmail.com>
AuthorDate: Tue Jul 12 17:41:58 2022 +0800

    [FLINK-28510][python][connector] Support using new KafkaSink
    
    This closes #20263.
---
 .../docs/connectors/datastream/formats/avro.md     |   2 +-
 .../docs/connectors/datastream/formats/parquet.md  |   2 +-
 .../content.zh/docs/connectors/datastream/kafka.md | 165 +++++++++-
 .../docs/connectors/datastream/formats/avro.md     |   2 +-
 .../docs/connectors/datastream/formats/parquet.md  |   2 +-
 docs/content/docs/connectors/datastream/kafka.md   | 162 ++++++++-
 docs/layouts/shortcodes/py_download_link.html      |  62 +++-
 flink-python/pyflink/common/types.py               |  33 ++
 flink-python/pyflink/datastream/__init__.py        |   4 +
 .../pyflink/datastream/connectors/__init__.py      |  26 +-
 flink-python/pyflink/datastream/connectors/base.py |  19 ++
 .../pyflink/datastream/connectors/kafka.py         | 364 ++++++++++++++++++++-
 .../datastream/connectors/tests/test_kafka.py      | 293 +++++++++++++++--
 flink-python/pyflink/datastream/data_stream.py     |  10 +-
 .../flink/python/util/PythonConnectorUtils.java    |  97 ++++++
 15 files changed, 1185 insertions(+), 58 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/formats/avro.md b/docs/content.zh/docs/connectors/datastream/formats/avro.md
index f07d0c4597b..a43afbcb323 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/avro.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/avro.md
@@ -40,7 +40,7 @@ Flink 的序列化框架可以处理基于 Avro schemas 生成的类。为了能
 </dependency>
 ```
 
-{{< py_download_link "avro" "flink-sql-avro.jar" >}}
+{{< py_download_link "avro" >}}
 
 如果读取 Avro 文件数据,你必须指定 `AvroInputFormat`。
 
diff --git a/docs/content.zh/docs/connectors/datastream/formats/parquet.md b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
index 13b949e06dc..b8182fac55f 100644
--- a/docs/content.zh/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content.zh/docs/connectors/datastream/formats/parquet.md
@@ -62,7 +62,7 @@ Flink 支持读取 [Parquet](https://parquet.apache.org/) 文件并生成 {{< ja
 </dependency>
 ```
 
-{{< py_download_link "parquet" "flink-sql-parquet.jar" >}}
+{{< py_download_link "parquet" >}}
 
 此格式与新的 Source 兼容,可以同时在批和流模式下使用。
 因此,你可使用此格式处理以下两类数据:
diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md
index 7ec1d2e1c2c..3a4c448615e 100644
--- a/docs/content.zh/docs/connectors/datastream/kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/kafka.md
@@ -45,6 +45,8 @@ Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 
 Flink 目前的流连接器还不是二进制发行版的一部分。
 [在此处]({{< ref "docs/dev/configuration/overview" >}})可以了解到如何链接它们,从而在集群中运行。
 
+{{< py_download_link "kafka" >}}
+
 ## Kafka Source
 {{< hint info >}}
 该文档描述的是基于[新数据源 API]({{< ref "docs/dev/datastream/sources.md" >}}) 的 Kafka Source。
@@ -53,6 +55,9 @@ Flink 目前的流连接器还不是二进制发行版的一部分。
 ### 使用方法
 Kafka Source 提供了构建类来创建 ```KafkaSource``` 的实例。以下代码片段展示了如何构建 ```KafkaSource```
 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:
+
+{{< tabs "KafkaSource" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource<String> source = KafkaSource.<String>builder()
     .setBootstrapServers(brokers)
@@ -64,6 +69,22 @@ KafkaSource<String> source = KafkaSource.<String>builder()
 
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = KafkaSource.builder() \
+    .set_bootstrap_servers(brokers) \
+    .set_topics("input-topic") \
+    .set_group_id("my-group") \
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+    .set_value_only_deserializer(SimpleStringSchema()) \
+    .build()
+
+env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 以下属性在构建 KafkaSource 时是必须指定的:
 - Bootstrap server,通过 ```setBootstrapServers(String)``` 方法配置
 - 消费者组 ID,通过 ```setGroupId(String)``` 配置
@@ -73,20 +94,52 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 ### Topic / Partition 订阅
 Kafka Source 提供了 3 种 Topic / Partition 的订阅方式:
 - Topic 列表,订阅 Topic 列表中所有 Partition 的消息:
+  {{< tabs "KafkaSource#setTopics" >}}
+  {{< tab "Java" >}}
   ```java
   KafkaSource.builder().setTopics("topic-a", "topic-b");
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  KafkaSource.builder().set_topics("topic-a", "topic-b")
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - 正则表达式匹配,订阅与正则表达式所匹配的 Topic 下的所有 Partition:
+  {{< tabs "KafkaSource#setTopicPattern" >}}
+  {{< tab "Java" >}}
   ```java
   KafkaSource.builder().setTopicPattern("topic.*");
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  KafkaSource.builder().set_topic_pattern("topic.*")
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - Partition 列表,订阅指定的 Partition:
+  {{< tabs "KafkaSource#setPartitions" >}}
+  {{< tab "Java" >}}
   ```java
   final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
           new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
           new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
   KafkaSource.builder().setPartitions(partitionSet);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  partition_set = {
+      KafkaTopicPartition("topic-a", 0),
+      KafkaTopicPartition("topic-b", 5)
+  }
+  KafkaSource.builder().set_partitions(partition_set)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 ### 消息解析
 代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析。
 反序列化器通过 ```setDeserializer(KafkaRecordDeserializationSchema)``` 来指定,其中 ```KafkaRecordDeserializationSchema``` 
@@ -105,9 +158,16 @@ KafkaSource.<String>builder()
         .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
 ```
 
+目前 PyFlink 只支持 ```set_value_only_deserializer``` 来自定义 Kafka 消息中值的反序列化.
+```python
+KafkaSource.builder().set_value_only_deserializer(SimpleStringSchema())
+```
+
 ### 起始消费位点
 Kafka source 能够通过位点初始化器(```OffsetsInitializer```)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:
 
+{{< tabs "KafkaSource#setStartingOffsets" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     // 从消费组提交的位点开始消费,不指定位点重置策略
@@ -121,7 +181,25 @@ KafkaSource.builder()
     // 从最末尾位点开始消费
     .setStartingOffsets(OffsetsInitializer.latest());
 ```
-如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(```OffsetsInitializer```)。
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder()
+    # 从消费组提交的位点开始消费,不指定位点重置策略
+    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
+    # 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
+    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
+    # 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
+    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
+    # 从最早位点开始消费
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+    # 从最末尾位点开始消费
+    .set_starting_offsets(KafkaOffsetsInitializer.latest())
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+如果内置的初始化器不能满足需求,也可以实现自定义的位点初始化器(```OffsetsInitializer```)。( PyFlink 不支持)
 
 如果未指定位点初始化器,将默认使用 ```OffsetsInitializer.earliest()```。
 
@@ -153,10 +231,22 @@ Kafka consumer 的配置可以参考 [Apache Kafka 文档](http://kafka.apache.o
 为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 
 订阅模式下定期检查新分区。要启用动态分区检查,请将 ```partition.discovery.interval.ms``` 设置为非负值:
 
+
+{{< tabs "KafkaSource#PartitionDiscovery" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 {{< hint warning >}}
 分区检查功能默认**不开启**。需要显式地设置分区检查间隔才能启用此功能。
 {{< /hint >}}
@@ -167,7 +257,7 @@ KafkaSource.builder()
 ```java
 env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
 ```
-[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}})描述了如何自定义水印策略(```WatermarkStrategy```)。
+[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}})描述了如何自定义水印策略(```WatermarkStrategy```)。( PyFlink 不支持)
 
 ### 空闲
 如果并行度高于分区数,Kafka Source 不会自动进入空闲状态。您将需要降低并行度或向水印策略添加空闲超时。如果在这段时间内没有记录在流的分区中流动,则该分区被视为“空闲”并且不会阻止下游操作符中水印的进度。
@@ -266,24 +356,37 @@ Kafka consumer 的所有指标都注册在指标组 ```KafkaSourceReader.KafkaCo
 要启用加密和认证相关的安全配置,只需将安全配置作为其他属性配置在 Kafka source 上即可。下面的代码片段展示了如何配置 Kafka source 以使用
 PLAIN 作为 SASL 机制并提供 JAAS 配置:
 
+{{< tabs "KafkaSource#SecurityPlain" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("security.protocol", "SASL_PLAINTEXT")
     .setProperty("sasl.mechanism", "PLAIN")
     .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("security.protocol", "SASL_PLAINTEXT") \
+    .set_property("sasl.mechanism", "PLAIN") \
+    .set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 另一个更复杂的例子,使用 SASL_SSL 作为安全协议并使用 SCRAM-SHA-256 作为 SASL 机制:
+
+{{< tabs "KafkaSource#SecuritySASL" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("security.protocol", "SASL_SSL")
     // SSL 配置
     // 配置服务端提供的 truststore (CA 证书) 的路径
-    // Configure the path of truststore (CA) provided by the server
     .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
     .setProperty("ssl.truststore.password", "test1234")
     // 如果要求客户端认证,则需要配置 keystore (私钥) 的路径
-    // Configure the path of keystore (private key) if client authentication is required
     .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
     .setProperty("ssl.keystore.password", "test1234")
     // SASL 配置
@@ -292,6 +395,26 @@ KafkaSource.builder()
     // 配置 JAAS
     .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("security.protocol", "SASL_SSL") \
+    # SSL 配置
+    # 配置服务端提供的 truststore (CA 证书) 的路径
+    .set_property("ssl.truststore.location", "/path/to/kafka.client.truststore.jks") \
+    .set_property("ssl.truststore.password", "test1234") \
+    # 如果要求客户端认证,则需要配置 keystore (私钥) 的路径
+    .set_property("ssl.keystore.location", "/path/to/kafka.client.keystore.jks") \
+    .set_property("ssl.keystore.password", "test1234") \
+    # SASL 配置
+    # 将 SASL 机制配置为 SCRAM-SHA-256
+    .set_property("sasl.mechanism", "SCRAM-SHA-256") \
+    # 配置 JAAS
+    .set_property("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
+```
+{{< /tab >}}
+{{< /tabs >}}
 如果在作业 JAR 中 Kafka 客户端依赖的类路径被重置了(relocate class),登录模块(login module)的类路径可能会不同,因此请根据登录模块在
 JAR 中实际的类路径来改写以上配置。
 
@@ -341,6 +464,8 @@ Kafka source 的源读取器扩展了 ```SourceReaderBase```,并使用单线
 Kafka sink 提供了构建类来创建 ```KafkaSink``` 的实例。以下代码片段展示了如何将字符串数据按照至少一次(at lease once)的语义保证写入 Kafka
 topic:
 
+{{< tabs "KafkaSink" >}}
+{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...;
         
@@ -356,6 +481,24 @@ KafkaSink<String> sink = KafkaSink.<String>builder()
         
 stream.sinkTo(sink);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+sink = KafkaSink.builder() \
+    .set_bootstrap_servers(brokers) \
+    .set_record_serializer(
+        KafkaRecordSerializationSchema.builder()
+            .set_topic("topic-name")
+            .set_value_serialization_schema(SimpleStringSchema())
+            .build()
+    ) \
+    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+    .build()
+
+stream.sink_to(sink)
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 以下属性在构建 KafkaSink 时是必须指定的:
 
@@ -368,6 +511,8 @@ stream.sinkTo(sink);
 构建时需要提供 ```KafkaRecordSerializationSchema``` 来将输入数据转换为 Kafka 的 ```ProducerRecord```。Flink 提供了 schema 构建器
 以提供一些通用的组件,例如消息键(key)/消息体(value)序列化、topic 选择、消息分区,同样也可以通过实现对应的接口来进行更丰富的控制。
 
+{{< tabs "KafkaSink#Serializer" >}}
+{{< tab "Java" >}}
 ```java
 KafkaRecordSerializationSchema.builder()
     .setTopicSelector((element) -> {<your-topic-selection-logic>})
@@ -376,6 +521,18 @@ KafkaRecordSerializationSchema.builder()
     .setPartitioner(new FlinkFixedPartitioner())
     .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaRecordSerializationSchema.builder() \
+    .set_topic_selector(lambda element: <your-topic-selection-logic>) \
+    .set_value_serialization_schema(SimpleStringSchema()) \
+    .set_key_serialization_schema(SimpleStringSchema()) \
+    # set partitioner is not supported in PyFlink
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 其中消息体(value)序列化方法和 topic 的选择方法是必须指定的,此外也可以通过 ```setKafkaKeySerializer(Serializer)``` 或
 ```setKafkaValueSerializer(Serializer)``` 来使用 Kafka 提供而非 Flink 提供的序列化器。
diff --git a/docs/content/docs/connectors/datastream/formats/avro.md b/docs/content/docs/connectors/datastream/formats/avro.md
index 15e564172d0..2784519d961 100644
--- a/docs/content/docs/connectors/datastream/formats/avro.md
+++ b/docs/content/docs/connectors/datastream/formats/avro.md
@@ -39,7 +39,7 @@ The serialization framework of Flink is able to handle classes generated from Av
 </dependency>
 ```
 
-{{< py_download_link "avro" "flink-sql-avro.jar" >}}
+{{< py_download_link "avro" >}}
 
 In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
 
diff --git a/docs/content/docs/connectors/datastream/formats/parquet.md b/docs/content/docs/connectors/datastream/formats/parquet.md
index 3364cbc8432..217a88fc107 100644
--- a/docs/content/docs/connectors/datastream/formats/parquet.md
+++ b/docs/content/docs/connectors/datastream/formats/parquet.md
@@ -61,7 +61,7 @@ To read Avro records, you will need to add the `parquet-avro` dependency:
 </dependency>
 ```
 
-{{< py_download_link "parquet" "flink-sql-parquet.jar" >}}
+{{< py_download_link "parquet" >}}
 
 This format is compatible with the new Source that can be used in both batch and streaming execution modes.
 Thus, you can use this format for two kinds of data:
diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md
index ff3f37ade25..82bcc063bd6 100644
--- a/docs/content/docs/connectors/datastream/kafka.md
+++ b/docs/content/docs/connectors/datastream/kafka.md
@@ -41,6 +41,8 @@ For details on Kafka compatibility, please refer to the official [Kafka document
 Flink's streaming connectors are not part of the binary distribution.
 See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
 
+{{< py_download_link "kafka" >}}
+
 ## Kafka Source
 {{< hint info >}}
 This part describes the Kafka source based on the new 
@@ -51,6 +53,9 @@ This part describes the Kafka source based on the new
 Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet
 below shows how to build a KafkaSource to consume messages from the earliest offset of topic
 "input-topic", with consumer group "my-group" and deserialize only the value of message as string.
+
+{{< tabs "KafkaSource" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource<String> source = KafkaSource.<String>builder()
     .setBootstrapServers(brokers)
@@ -62,6 +67,22 @@ KafkaSource<String> source = KafkaSource.<String>builder()
 
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+source = KafkaSource.builder() \
+    .set_bootstrap_servers(brokers) \
+    .set_topics("input-topic") \
+    .set_group_id("my-group") \
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+    .set_value_only_deserializer(SimpleStringSchema()) \
+    .build()
+
+env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 The following properties are **required** for building a KafkaSource:
 - Bootstrap servers, configured by ```setBootstrapServers(String)```
 - Topics / partitions to subscribe, see the following
@@ -72,21 +93,53 @@ The following properties are **required** for building a KafkaSource:
 ### Topic-partition Subscription
 Kafka source provide 3 ways of topic-partition subscription:
 - Topic list, subscribing messages from all partitions in a list of topics. For example:
+  {{< tabs "KafkaSource#setTopics" >}}
+  {{< tab "Java" >}}
   ```java
   KafkaSource.builder().setTopics("topic-a", "topic-b");
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  KafkaSource.builder().set_topics("topic-a", "topic-b")
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - Topic pattern, subscribing messages from all topics whose name matches the provided regular
   expression. For example:
+  {{< tabs "KafkaSource#setTopicPattern" >}}
+  {{< tab "Java" >}}
   ```java
   KafkaSource.builder().setTopicPattern("topic.*");
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  KafkaSource.builder().set_topic_pattern("topic.*")
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 - Partition set, subscribing partitions in the provided partition set. For example:
+  {{< tabs "KafkaSource#setPartitions" >}}
+  {{< tab "Java" >}}
   ```java
   final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
           new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
           new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
   KafkaSource.builder().setPartitions(partitionSet);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  partition_set = {
+      KafkaTopicPartition("topic-a", 0),
+      KafkaTopicPartition("topic-b", 5)
+  }
+  KafkaSource.builder().set_partitions(partition_set)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 ### Deserializer
 A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be
 configured by ```setDeserializer(KafkaRecordDeserializationSchema)```, where
@@ -106,10 +159,17 @@ KafkaSource.<String>builder()
         .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
 ```
 
+Currently, PyFlink only supports ```set_value_only_deserializer``` to customize deserialization of the value of a Kafka record.
+```python
+KafkaSource.builder().set_value_only_deserializer(SimpleStringSchema())
+```
+
 ### Starting Offset
 Kafka source is able to consume messages starting from different offsets by specifying
 ```OffsetsInitializer```. Built-in initializers include:
 
+{{< tabs "KafkaSource#setStartingOffsets" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     // Start from committed offset of the consuming group, without reset strategy
@@ -123,9 +183,26 @@ KafkaSource.builder()
     // Start from latest offset
     .setStartingOffsets(OffsetsInitializer.latest());
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    # Start from committed offset of the consuming group, without reset strategy
+    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
+    # Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
+    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
+    # Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
+    .set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
+    # Start from the earliest offset
+    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
+    # Start from the latest offset
+    .set_starting_offsets(KafkaOffsetsInitializer.latest())
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 You can also implement a custom offsets initializer if built-in initializers above cannot fulfill
-your requirement.
+your requirement. (Not supported in PyFlink)
 
 If offsets initializer is not specified, **OffsetsInitializer.earliest()** will be
 used by default.
@@ -170,10 +247,22 @@ In order to handle scenarios like topic scaling-out or topic creation without re
 job, Kafka source can be configured to periodically discover new partitions under provided 
 topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for 
 property ```partition.discovery.interval.ms```:
+
+{{< tabs "KafkaSource#PartitionDiscovery" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("partition.discovery.interval.ms", "10000"); // discover new partitions per 10 seconds
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("partition.discovery.interval.ms", "10000")  # discover new partitions per 10 seconds
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 {{< hint warning >}}
 Partition discovery is **disabled** by default. You need to explicitly set the partition discovery
 interval to enable this feature.
@@ -187,7 +276,7 @@ and emit watermark downstream:
 env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
 ```
 [This documentation]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
-details about how to define a ```WatermarkStrategy```.
+details about how to define a ```WatermarkStrategy```. (Not supported in PyFlink)
 
 ### Idleness
 The Kafka Source does not go automatically in an idle state if the parallelism is higher than the
@@ -311,14 +400,29 @@ In order to enable security configurations including encryption and authenticati
 configurations as additional properties to the Kafka source. The code snippet below shows configuring Kafka source to
 use PLAIN as SASL mechanism and provide JAAS configuration:
 
+{{< tabs "KafkaSource#SecurityPlain" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("security.protocol", "SASL_PLAINTEXT")
     .setProperty("sasl.mechanism", "PLAIN")
     .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("security.protocol", "SASL_PLAINTEXT") \
+    .set_property("sasl.mechanism", "PLAIN") \
+    .set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 For a more complex example, use SASL_SSL as the security protocol and use SCRAM-SHA-256 as SASL mechanism:
+
+{{< tabs "KafkaSource#SecuritySASL" >}}
+{{< tab "Java" >}}
 ```java
 KafkaSource.builder()
     .setProperty("security.protocol", "SASL_SSL")
@@ -335,6 +439,26 @@ KafkaSource.builder()
     // Set JAAS configurations
     .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaSource.builder() \
+    .set_property("security.protocol", "SASL_SSL") \
+    # SSL configurations
+    # Configure the path of truststore (CA) provided by the server
+    .set_property("ssl.truststore.location", "/path/to/kafka.client.truststore.jks") \
+    .set_property("ssl.truststore.password", "test1234") \
+    # Configure the path of keystore (private key) if client authentication is required
+    .set_property("ssl.keystore.location", "/path/to/kafka.client.keystore.jks") \
+    .set_property("ssl.keystore.password", "test1234") \
+    # SASL configurations
+    # Set SASL mechanism as SCRAM-SHA-256
+    .set_property("sasl.mechanism", "SCRAM-SHA-256") \
+    # Set JAAS configurations
+    .set_property("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 Please note that the class path of the login module in `sasl.jaas.config` might be different if you relocate Kafka
 client dependencies in the job JAR, so you may need to rewrite it with the actual class path of the module in the JAR.
@@ -395,6 +519,8 @@ For older references you can look at the Flink 1.13 <a href="https://nightlies.a
 Kafka sink provides a builder class to construct an instance of a KafkaSink. The code snippet below
 shows how to write String records to a Kafka topic with a delivery guarantee of at least once.
 
+{{< tabs "KafkaSink" >}}
+{{< tab "Java" >}}
 ```java
 DataStream<String> stream = ...;
         
@@ -410,6 +536,24 @@ KafkaSink<String> sink = KafkaSink.<String>builder()
         
 stream.sinkTo(sink);
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+sink = KafkaSink.builder() \
+    .set_bootstrap_servers(brokers) \
+    .set_record_serializer(
+        KafkaRecordSerializationSchema.builder()
+            .set_topic("topic-name")
+            .set_value_serialization_schema(SimpleStringSchema())
+            .build()
+    ) \
+    .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+    .build()
+
+stream.sink_to(sink)
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 The following properties are **required** to build a KafkaSink:
 
@@ -425,6 +569,8 @@ the data stream to Kafka producer records.
 Flink offers a schema builder to provide some common building blocks i.e. key/value serialization, topic
 selection, partitioning. You can also implement the interface on your own to exert more control.
 
+{{< tabs "KafkaSink#Serializer" >}}
+{{< tab "Java" >}}
 ```java
 KafkaRecordSerializationSchema.builder()
     .setTopicSelector((element) -> {<your-topic-selection-logic>})
@@ -433,6 +579,18 @@ KafkaRecordSerializationSchema.builder()
     .setPartitioner(new FlinkFixedPartitioner())
     .build();
 ```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+KafkaRecordSerializationSchema.builder() \
+    .set_topic_selector(lambda element: <your-topic-selection-logic>) \
+    .set_value_serialization_schema(SimpleStringSchema()) \
+    .set_key_serialization_schema(SimpleStringSchema()) \
+    # set partitioner is not supported in PyFlink
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
 
 It is **required** to always set a value serialization method and a topic (selection method).
 Moreover, it is also possible to use Kafka serializers instead of Flink serializer by using 
diff --git a/docs/layouts/shortcodes/py_download_link.html b/docs/layouts/shortcodes/py_download_link.html
index 9644b336bc9..b5f57402edd 100644
--- a/docs/layouts/shortcodes/py_download_link.html
+++ b/docs/layouts/shortcodes/py_download_link.html
@@ -19,12 +19,68 @@ under the License.
 Generates the PyFlink download connector page.
 */}}
 {{ $name := .Get 0 }}
-{{ $text := .Get 1 }}
 {{ $connectors := .Site.Data.sql_connectors }}
 {{ $connector := index $connectors $name }}
 
-<p>In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
-dependencies are required: <a href="{{- partial "docs/interpolate" $connector.sql_url -}}">{{ $text }}</a>.
+<p>
+{{ if eq $.Site.Language.Lang "en" }}
+In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink jobs, the following
+dependencies are required:
+{{ else if eq $.Site.Language.Lang "zh" }}
+为了在 PyFlink 作业中使用 {{ $connector.name }} {{ $connector.category }} ,需要添加下列依赖:
+{{ end }}
+{{ if eq $connector.versions nil }}
+<table>
+    <thead>
+    <th style="text-align:left">PyFlink JAR</th>
+    </thead>
+    <tbody>
+    <tr>
+        {{ if eq $connector.builtin true }}
+        <td style="text-align: left">Built-in</td>
+        {{ else if $.Site.Params.IsStable }}
+        {{ if eq $connector.sql_url nil }}
+        <td style="text-align:left">There is no sql jar available yet.</td>
+        {{ else }}
+        <td style="text-align: left"><a href="{{- partial "docs/interpolate" $connector.sql_url -}}">Download</a></td>
+        {{ end }}
+        {{ else }}
+        <td style="text-align: left">Only available for stable releases.</td>
+        {{ end }}
+    </tr>
+    </tbody>
+</table>
+{{ else }}
+<table>
+    <thead>
+    <th style="text-align: left">{{ $connector.name }} version</th>
+    <th style="text-align: left">PyFlink JAR</th>
+    </thead>
+    <tbody>
+    {{ range $connector.versions }}
+    <tr>
+        <td style="text-align: left">{{ .version }}</td>
+        {{ if eq .builtin true }}
+        <td style="text-align: left">Built-in</td>
+        {{ else if eq .nosqljar true}}
+        {{ else if $.Site.Params.IsStable }}
+        {{ if eq .sql_url nil}}
+        <td style="text-align:left">There is no sql jar available yet.</td>
+        {{ else }}
+        <td style="text-align:left"><a href="{{- partial "docs/interpolate" .sql_url -}}">Download</a></td>
+        {{ end }}
+        {{ else }}
+        <td>Only available for stable releases.</td>
+        {{ end }}
+    </tr>
+    {{ end }}
+    </tbody>
+</table>
+{{ end }}
+{{ if eq .Site.Language.Lang "en" }}
 See <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/python/dependency_management/#jar-dependencies">Python dependency management</a>
 for more details on how to use JARs in PyFlink.
+{{ else if eq .Site.Language.Lang "zh" }}
+在 PyFlink 中如何添加 JAR 包依赖参见 <a href="{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/python/dependency_management/#jar-dependencies">Python 依赖管理</a>。
+{{ end }}
 </p>
diff --git a/flink-python/pyflink/common/types.py b/flink-python/pyflink/common/types.py
index 1711d7d2b7a..be07e68f4cd 100644
--- a/flink-python/pyflink/common/types.py
+++ b/flink-python/pyflink/common/types.py
@@ -22,6 +22,8 @@ from typing import List
 
 __all__ = ['Row', 'RowKind']
 
+from pyflink.java_gateway import get_gateway
+
 
 class RowKind(Enum):
     INSERT = 0
@@ -39,6 +41,10 @@ class RowKind(Enum):
         else:
             return '-D'
 
+    def to_j_row_kind(self):
+        JRowKind = get_gateway().jvm.org.apache.flink.types.RowKind
+        return getattr(JRowKind, self.name)
+
 
 def _create_row(fields, values, row_kind: RowKind = None):
     row = Row(*values)
@@ -277,3 +283,30 @@ class Row(object):
 
     def __len__(self):
         return len(self._values)
+
+
+def to_java_data_structure(value):
+    jvm = get_gateway().jvm
+    if isinstance(value, (int, float, str)):
+        return value
+    elif isinstance(value, (list, tuple)):
+        j_list = jvm.java.util.ArrayList()
+        for item in value:
+            j_list.add(to_java_data_structure(item))
+        return j_list
+    elif isinstance(value, map):
+        j_map = jvm.java.util.HashMap()
+        for k, v in value:
+            j_map.put(to_java_data_structure(k), to_java_data_structure(v))
+        return j_map
+    elif isinstance(value, Row):
+        j_row = jvm.org.apache.flink.types.Row(value.get_row_kind().to_j_row_kind(), len(value))
+        if hasattr(value, '_fields'):
+            for field_name, value in zip(value._fields, value._values):
+                j_row.setField(field_name, to_java_data_structure(value))
+        else:
+            for idx, value in enumerate(value._values):
+                j_row.setField(idx, to_java_data_structure(value))
+        return j_row
+    else:
+        raise TypeError('value must be a vanilla Python object')
diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py
index 3c6c5242531..cfa3dd710eb 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -169,6 +169,10 @@ Classes to define source & sink:
       A streaming data source that pulls a parallel data stream from Apache Kafka.
     - :class:`connectors.FlinkKafkaProducer`:
       A streaming data sink to produce data into a Kafka topic.
+    - :class:`connectors.KafkaSource`:
+      The new API to read data in parallel from Apache Kafka.
+    - :class:`connectors.KafkaSink`:
+      The new API to write data into to Apache Kafka topics.
     - :class:`connectors.FileSource`:
       A unified data source that reads files - both in batch and in streaming mode.
       This source supports all (distributed) file systems and object stores that can be accessed via
diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py
index 0e052b75ddc..653507d4db2 100644
--- a/flink-python/pyflink/datastream/connectors/__init__.py
+++ b/flink-python/pyflink/datastream/connectors/__init__.py
@@ -24,8 +24,21 @@ from pyflink.datastream.connectors.file_system import (FileEnumeratorProvider, F
                                                        RollingPolicy,
                                                        StreamFormat, StreamingFileSink, BulkFormat)
 from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions, JdbcExecutionOptions
-from pyflink.datastream.connectors.kafka import (FlinkKafkaConsumer, FlinkKafkaProducer, Semantic,
-                                                 KafkaSource)
+from pyflink.datastream.connectors.kafka import (
+    FlinkKafkaConsumer,
+    FlinkKafkaProducer,
+    Semantic,
+    KafkaSource,
+    KafkaSourceBuilder,
+    KafkaTopicPartition,
+    KafkaOffsetsInitializer,
+    KafkaOffsetResetStrategy,
+    KafkaSink,
+    KafkaSinkBuilder,
+    KafkaRecordSerializationSchema,
+    KafkaRecordSerializationSchemaBuilder,
+    KafkaTopicSelector,
+)
 from pyflink.datastream.connectors.number_seq import NumberSequenceSource
 from pyflink.datastream.connectors.pulsar import PulsarDeserializationSchema, PulsarSource, \
     PulsarSourceBuilder, SubscriptionType, StartCursor, StopCursor, PulsarSerializationSchema, \
@@ -50,6 +63,15 @@ __all__ = [
     'FlinkKafkaProducer',
     'Semantic',
     'KafkaSource',
+    'KafkaSourceBuilder',
+    'KafkaTopicPartition',
+    'KafkaOffsetsInitializer',
+    'KafkaOffsetResetStrategy',
+    'KafkaSink',
+    'KafkaSinkBuilder',
+    'KafkaRecordSerializationSchema',
+    'KafkaRecordSerializationSchemaBuilder',
+    'KafkaTopicSelector',
     'JdbcSink',
     'JdbcConnectionOptions',
     'JdbcExecutionOptions',
diff --git a/flink-python/pyflink/datastream/connectors/base.py b/flink-python/pyflink/datastream/connectors/base.py
index 2bbcffc1f50..a229155bd79 100644
--- a/flink-python/pyflink/datastream/connectors/base.py
+++ b/flink-python/pyflink/datastream/connectors/base.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+from abc import ABC, abstractmethod
 from enum import Enum
 from typing import Union
 
@@ -52,6 +53,24 @@ class Sink(JavaFunctionWrapper):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class SupportPreprocessing(ABC):
+
+    @abstractmethod
+    def need_preprocessing(self) -> bool:
+        pass
+
+    @abstractmethod
+    def get_preprocessing(self) -> 'TransformAppender':
+        pass
+
+
 class DeliveryGuarantee(Enum):
     """
     DeliverGuarantees that can be chosen. In general your pipeline can only offer the lowest
diff --git a/flink-python/pyflink/datastream/connectors/kafka.py b/flink-python/pyflink/datastream/connectors/kafka.py
index 6605bc48594..226d3585125 100644
--- a/flink-python/pyflink/datastream/connectors/kafka.py
+++ b/flink-python/pyflink/datastream/connectors/kafka.py
@@ -15,32 +15,42 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import abc
 import warnings
+from abc import ABC, abstractmethod
 from enum import Enum
-from typing import Dict, Union, List, Set
+from typing import Dict, Union, List, Set, Callable, Any, Optional
 
 from py4j.java_gateway import JavaObject, get_java_class
 
-from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema
-from pyflink.datastream.connectors import Source
+from pyflink.common import DeserializationSchema, TypeInformation, typeinfo, SerializationSchema, \
+    Types, Row
+from pyflink.datastream.connectors import Source, Sink
+from pyflink.datastream.connectors.base import DeliveryGuarantee, SupportPreprocessing, \
+    TransformAppender
 from pyflink.datastream.functions import SinkFunction, SourceFunction
 from pyflink.java_gateway import get_gateway
-from pyflink.util.java_utils import to_jarray
-
+from pyflink.util.java_utils import to_jarray, get_field, get_field_value
 
 __all__ = [
-    'KafkaOffsetResetStrategy',
-    'KafkaOffsetsInitializer',
+    'FlinkKafkaConsumer',
+    'FlinkKafkaProducer',
+    'Semantic',
     'KafkaSource',
     'KafkaSourceBuilder',
     'KafkaTopicPartition',
+    'KafkaOffsetsInitializer',
+    'KafkaOffsetResetStrategy',
+    'KafkaSink',
+    'KafkaSinkBuilder',
+    'KafkaRecordSerializationSchema',
+    'KafkaRecordSerializationSchemaBuilder',
+    'KafkaTopicSelector',
 ]
 
 
 # ---- FlinkKafkaConsumer ----
 
-class FlinkKafkaConsumerBase(SourceFunction, abc.ABC):
+class FlinkKafkaConsumerBase(SourceFunction, ABC):
     """
     Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
     all kafka versions.
@@ -241,7 +251,7 @@ class Semantic(Enum):
         return getattr(JSemantic, self.name)
 
 
-class FlinkKafkaProducerBase(SinkFunction, abc.ABC):
+class FlinkKafkaProducerBase(SinkFunction, ABC):
     """
     Flink Sink to produce data into a Kafka topic.
 
@@ -816,3 +826,337 @@ class KafkaOffsetsInitializer(object):
             enumerator.initializer.OffsetsInitializer
         return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
             j_map_wrapper.asMap(), offset_reset_strategy._to_j_offset_reset_strategy()))
+
+
+class KafkaSink(Sink, SupportPreprocessing):
+    """
+    Flink Sink to produce data into a Kafka topic. The sink supports all delivery guarantees
+    described by :class:`DeliveryGuarantee`.
+
+    * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages may be lost in case
+      of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
+    * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all outstanding records in the
+      Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be
+      lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink
+      restarts.
+    * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will write all messages in
+      a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer
+      reads only committed data (see Kafka consumer config ``isolation.level``), no duplicates
+      will be seen in case of a Flink restart. However, this delays record writing effectively
+      until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure
+      that you use unique transactional id prefixes across your applications running on the same
+      Kafka cluster such that multiple running jobs do not interfere in their transactions!
+      Additionally, it is highly recommended to tweak Kafka transaction timeout (link) >> maximum
+      checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an
+      uncommitted transaction.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_kafka_sink, preprocessing: TransformAppender = None):
+        super().__init__(j_kafka_sink)
+        self._preprocessing = preprocessing
+
+    @staticmethod
+    def builder() -> 'KafkaSinkBuilder':
+        """
+        Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
+        """
+        return KafkaSinkBuilder()
+
+    def need_preprocessing(self) -> bool:
+        return self._preprocessing is not None
+
+    def get_preprocessing(self) -> TransformAppender:
+        return self._preprocessing
+
+
+class KafkaSinkBuilder(object):
+    """
+    Builder to construct :class:`KafkaSink`.
+
+    The following example shows the minimum setup to create a KafkaSink that writes String values
+    to a Kafka topic.
+
+    ::
+
+        >>> record_serializer = KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic(MY_SINK_TOPIC) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+        >>> sink = KafkaSink.builder() \\
+        ...     .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
+        ...     .set_record_serializer(record_serializer) \\
+        ...     .build()
+
+    One can also configure different :class:`DeliveryGuarantee` by using
+    :meth:`set_delivery_guarantee` but keep in mind when using
+    :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id prefix
+    :meth:`set_transactional_id_prefix`.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
+        self._preprocessing = None
+
+    def build(self) -> 'KafkaSink':
+        """
+        Constructs the :class:`KafkaSink` with the configured properties.
+        """
+        return KafkaSink(self._j_builder.build(), self._preprocessing)
+
+    def set_bootstrap_servers(self, bootstrap_servers: str) -> 'KafkaSinkBuilder':
+        """
+        Sets the Kafka bootstrap servers.
+
+        :param bootstrap_servers: A comma separated list of valid URIs to reach the Kafka broker.
+        """
+        self._j_builder.setBootstrapServers(bootstrap_servers)
+        return self
+
+    def set_delivery_guarantee(self, delivery_guarantee: DeliveryGuarantee) -> 'KafkaSinkBuilder':
+        """
+        Sets the wanted :class:`DeliveryGuarantee`. The default delivery guarantee is
+        :attr:`DeliveryGuarantee.NONE`.
+
+        :param delivery_guarantee: The wanted :class:`DeliveryGuarantee`.
+        """
+        self._j_builder.setDeliveryGuarantee(delivery_guarantee._to_j_delivery_guarantee())
+        return self
+
+    def set_transactional_id_prefix(self, transactional_id_prefix: str) -> 'KafkaSinkBuilder':
+        """
+        Sets the prefix for all created transactionalIds if :attr:`DeliveryGuarantee.EXACTLY_ONCE`
+        is configured.
+
+        It is mandatory to always set this value with :attr:`DeliveryGuarantee.EXACTLY_ONCE` to
+        prevent corrupted transactions if multiple jobs using the KafkaSink run against the same
+        Kafka Cluster. The default prefix is ``"kafka-sink"``.
+
+        The size of the prefix is capped by MAXIMUM_PREFIX_BYTES (6400) formatted with UTF-8.
+
+        It is important to keep the prefix stable across application restarts. If the prefix changes
+        it might happen that lingering transactions are not correctly aborted and newly written
+        messages are not immediately consumable until transactions timeout.
+
+        :param transactional_id_prefix: The transactional id prefix.
+        """
+        self._j_builder.setTransactionalIdPrefix(transactional_id_prefix)
+        return self
+
+    def set_record_serializer(self, record_serializer: 'KafkaRecordSerializationSchema') \
+            -> 'KafkaSinkBuilder':
+        """
+        Sets the :class:`KafkaRecordSerializationSchema` that transforms incoming records to kafka
+        producer records.
+
+        :param record_serializer: The :class:`KafkaRecordSerializationSchema`.
+        """
+        # NOTE: If topic selector is a generated first-column selector, do extra preprocessing
+        j_topic_selector = get_field_value(record_serializer._j_serialization_schema,
+                                           'topicSelector')
+        if (
+            j_topic_selector.getClass().getCanonicalName() ==
+            'org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder.'
+            'CachingTopicSelector'
+        ) and (
+            get_field_value(j_topic_selector, 'topicSelector').getClass().getCanonicalName()
+                .startswith('com.sun.proxy')
+        ):
+            record_serializer._wrap_serialization_schema()
+            self._preprocessing = record_serializer._build_preprocessing()
+
+        self._j_builder.setRecordSerializer(record_serializer._j_serialization_schema)
+        return self
+
+    def set_property(self, key: str, value: str) -> 'KafkaSinkBuilder':
+        """
+        Sets kafka producer config.
+
+        :param key: Kafka producer config key.
+        :param value: Kafka producer config value.
+        """
+        self._j_builder.setProperty(key, value)
+        return self
+
+
+class KafkaRecordSerializationSchema(SerializationSchema):
+    """
+    A serialization schema which defines how to convert the stream record to kafka producer record.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_serialization_schema,
+                 topic_selector: Optional['KafkaTopicSelector'] = None):
+        super().__init__(j_serialization_schema)
+        self._topic_selector = topic_selector
+
+    @staticmethod
+    def builder() -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Creates a default schema builder to provide common building blocks i.e. key serialization,
+        value serialization, topic selection.
+        """
+        return KafkaRecordSerializationSchemaBuilder()
+
+    def _wrap_serialization_schema(self):
+        jvm = get_gateway().jvm
+
+        def _wrap_schema(field_name):
+            j_schema_field = get_field(self._j_serialization_schema.getClass(), field_name)
+            if j_schema_field.get(self._j_serialization_schema) is not None:
+                j_schema_field.set(
+                    self._j_serialization_schema,
+                    jvm.org.apache.flink.python.util.PythonConnectorUtils
+                    .SecondColumnSerializationSchema(
+                        j_schema_field.get(self._j_serialization_schema)
+                    )
+                )
+
+        _wrap_schema('keySerializationSchema')
+        _wrap_schema('valueSerializationSchema')
+
+    def _build_preprocessing(self) -> TransformAppender:
+        class TopicSelectorTransformAppender(TransformAppender):
+
+            def __init__(self, topic_selector: KafkaTopicSelector):
+                self._topic_selector = topic_selector
+
+            def apply(self, ds):
+                output_type = Types.ROW([Types.STRING(), ds.get_type()])
+                return ds.map(lambda v: Row(self._topic_selector.apply(v), v),
+                              output_type=output_type)
+
+        return TopicSelectorTransformAppender(self._topic_selector)
+
+
+class KafkaRecordSerializationSchemaBuilder(object):
+    """
+    Builder to construct :class:`KafkaRecordSerializationSchema`.
+
+    Example:
+    ::
+
+        >>> KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic('topic') \\
+        ...     .set_key_serialization_schema(SimpleStringSchema()) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+
+    And the sink topic can be calculated dynamically from each record:
+    ::
+
+        >>> KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic_selector(lambda row: 'topic-' + row['category']) \\
+        ...     .set_value_serialization_schema(
+        ...         JsonRowSerializationSchema.builder().with_type_info(ROW_TYPE).build()) \\
+        ...     .build()
+
+    It is necessary to configure exactly one serialization method for the value and a topic.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = jvm.org.apache.flink.connector.kafka.sink \
+            .KafkaRecordSerializationSchemaBuilder()
+        self._fixed_topic = True  # type: bool
+        self._topic_selector = None  # type: Optional[KafkaTopicSelector]
+        self._key_serialization_schema = None  # type: Optional[SerializationSchema]
+        self._value_serialization_schema = None  # type: Optional[SerializationSchema]
+
+    def build(self) -> 'KafkaRecordSerializationSchema':
+        """
+        Constructs the :class:`KafkaRecordSerializationSchemaBuilder` with the configured
+        properties.
+        """
+        if self._fixed_topic:
+            return KafkaRecordSerializationSchema(self._j_builder.build())
+        else:
+            return KafkaRecordSerializationSchema(self._j_builder.build(), self._topic_selector)
+
+    def set_topic(self, topic: str) -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a fixed topic which used as destination for all records.
+
+        :param topic: The fixed topic.
+        """
+        self._j_builder.setTopic(topic)
+        self._fixed_topic = True
+        return self
+
+    def set_topic_selector(self, topic_selector: Union[Callable[[Any], str], 'KafkaTopicSelector'])\
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a topic selector which computes the target topic for every incoming record.
+
+        :param topic_selector: A :class:`KafkaTopicSelector` implementation or a function that
+            consumes each incoming record and return the topic string.
+        """
+        if not isinstance(topic_selector, KafkaTopicSelector) and not callable(topic_selector):
+            raise TypeError('topic_selector must be KafkaTopicSelector or a callable')
+        if not isinstance(topic_selector, KafkaTopicSelector):
+            class TopicSelectorFunctionAdapter(KafkaTopicSelector):
+
+                def __init__(self, f: Callable[[Any], str]):
+                    self._f = f
+
+                def apply(self, data) -> str:
+                    return self._f(data)
+
+            topic_selector = TopicSelectorFunctionAdapter(topic_selector)
+
+        jvm = get_gateway().jvm
+        self._j_builder.setTopicSelector(
+            jvm.org.apache.flink.python.util.PythonConnectorUtils.createFirstColumnTopicSelector(
+                get_java_class(jvm.org.apache.flink.connector.kafka.sink.TopicSelector)
+            )
+        )
+        self._fixed_topic = False
+        self._topic_selector = topic_selector
+        return self
+
+    def set_key_serialization_schema(self, key_serialization_schema: SerializationSchema) \
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
+        key of the producer record. The key serialization is optional, if not set, the key of the
+        producer record will be null.
+
+        :param key_serialization_schema: The :class:`SerializationSchema` to serialize each incoming
+            record as the key of producer record.
+        """
+        self._key_serialization_schema = key_serialization_schema
+        self._j_builder.setKeySerializationSchema(key_serialization_schema._j_serialization_schema)
+        return self
+
+    def set_value_serialization_schema(self, value_serialization_schema: SerializationSchema) \
+            -> 'KafkaRecordSerializationSchemaBuilder':
+        """
+        Sets a :class:`SerializationSchema` which is used to serialize the incoming element to the
+        value of the producer record. The value serialization is required.
+
+        :param value_serialization_schema: The :class:`SerializationSchema` to serialize each data
+            record as the key of producer record.
+        """
+        self._value_serialization_schema = value_serialization_schema
+        self._j_builder.setValueSerializationSchema(
+            value_serialization_schema._j_serialization_schema)
+        return self
+
+
+class KafkaTopicSelector(ABC):
+    """
+    Select topic for an incoming record
+
+    .. versionadded:: 1.16.0
+    """
+
+    @abstractmethod
+    def apply(self, data) -> str:
+        pass
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
index ee61f17f55d..033b5643f67 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_kafka.py
@@ -15,21 +15,27 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
+import json
 from typing import Dict
 
+import pyflink.datastream.data_stream as data_stream
+
 from pyflink.common.configuration import Configuration
 from pyflink.common.serialization import SimpleStringSchema, DeserializationSchema, \
-    JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema
+    JsonRowDeserializationSchema, CsvRowDeserializationSchema, AvroRowDeserializationSchema, \
+    JsonRowSerializationSchema, CsvRowSerializationSchema, AvroRowSerializationSchema
 from pyflink.common.typeinfo import Types
+from pyflink.common.types import Row, to_java_data_structure
 from pyflink.common.watermark_strategy import WatermarkStrategy
+from pyflink.datastream.connectors.base import DeliveryGuarantee
 from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
-    KafkaOffsetsInitializer, KafkaOffsetResetStrategy
+    KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink
 from pyflink.java_gateway import get_gateway
-from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
-from pyflink.util.java_utils import to_jarray, is_instance_of
+from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase, PyFlinkTestCase
+from pyflink.util.java_utils import to_jarray, is_instance_of, get_field_value
 
 
-class KafkaSourceTest(PyFlinkStreamingTestCase):
+class KafkaSourceTests(PyFlinkStreamingTestCase):
 
     def test_compiling(self):
         source = KafkaSource.builder() \
@@ -42,7 +48,7 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
                                   watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
                                   source_name='kafka source')
         ds.print()
-        plan = eval(self.env.get_execution_plan())
+        plan = json.loads(self.env.get_execution_plan())
         self.assertEqual('Source: kafka source', plan['nodes'][0]['type'])
 
     def test_set_properties(self):
@@ -66,12 +72,12 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             .set_topics('test_topic1', 'test_topic2') \
             .set_value_only_deserializer(SimpleStringSchema()) \
             .build()
-        kafka_subscriber = self._get_java_field(source.get_java_function(), 'subscriber')
+        kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber')
         self.assertEqual(
             kafka_subscriber.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber'
         )
-        topics = self._get_java_field(kafka_subscriber, 'topics')
+        topics = get_field_value(kafka_subscriber, 'topics')
         self.assertTrue(is_instance_of(topics, get_gateway().jvm.java.util.List))
         self.assertEqual(topics.size(), 2)
         self.assertEqual(topics[0], 'test_topic1')
@@ -83,12 +89,12 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             .set_topic_pattern('test_topic*') \
             .set_value_only_deserializer(SimpleStringSchema()) \
             .build()
-        kafka_subscriber = self._get_java_field(source.get_java_function(), 'subscriber')
+        kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber')
         self.assertEqual(
             kafka_subscriber.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicPatternSubscriber'
         )
-        topic_pattern = self._get_java_field(kafka_subscriber, 'topicPattern')
+        topic_pattern = get_field_value(kafka_subscriber, 'topicPattern')
         self.assertTrue(is_instance_of(topic_pattern, get_gateway().jvm.java.util.regex.Pattern))
         self.assertEqual(topic_pattern.toString(), 'test_topic*')
 
@@ -100,12 +106,12 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             .set_partitions({topic_partition_1, topic_partition_2}) \
             .set_value_only_deserializer(SimpleStringSchema()) \
             .build()
-        kafka_subscriber = self._get_java_field(source.get_java_function(), 'subscriber')
+        kafka_subscriber = get_field_value(source.get_java_function(), 'subscriber')
         self.assertEqual(
             kafka_subscriber.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.subscriber.PartitionSetSubscriber'
         )
-        partitions = self._get_java_field(kafka_subscriber, 'subscribedPartitions')
+        partitions = get_field_value(kafka_subscriber, 'subscribedPartitions')
         self.assertTrue(is_instance_of(partitions, get_gateway().jvm.java.util.Set))
         self.assertTrue(topic_partition_1._to_j_topic_partition() in partitions)
         self.assertTrue(topic_partition_2._to_j_topic_partition() in partitions)
@@ -168,8 +174,7 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
 
         def _check_bounded(source: KafkaSource):
             self.assertEqual(
-                self._get_java_field(source.get_java_function(), 'boundedness').toString(),
-                'BOUNDED'
+                get_field_value(source.get_java_function(), 'boundedness').toString(), 'BOUNDED'
             )
 
         self._test_set_bounded_or_unbounded(_build_source, _check_bounded)
@@ -186,7 +191,7 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
 
         def _check_bounded(source: KafkaSource):
             self.assertEqual(
-                self._get_java_field(source.get_java_function(), 'boundedness').toString(),
+                get_field_value(source.get_java_function(), 'boundedness').toString(),
                 'CONTINUOUS_UNBOUNDED'
             )
 
@@ -236,22 +241,22 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             source, specified_offsets, KafkaOffsetResetStrategy.LATEST, False
         )
 
-    def test_set_value_only_deserialization_schema(self):
+    def test_set_value_only_deserializer(self):
         def _check(schema: DeserializationSchema, class_name: str):
             source = KafkaSource.builder() \
                 .set_bootstrap_servers('localhost:9092') \
                 .set_topics('test_topic') \
                 .set_value_only_deserializer(schema) \
                 .build()
-            deserialization_schema_wrapper = self._get_java_field(source.get_java_function(),
-                                                                  'deserializationSchema')
+            deserialization_schema_wrapper = get_field_value(source.get_java_function(),
+                                                             'deserializationSchema')
             self.assertEqual(
                 deserialization_schema_wrapper.getClass().getCanonicalName(),
                 'org.apache.flink.connector.kafka.source.reader.deserializer'
                 '.KafkaValueOnlyDeserializationSchemaWrapper'
             )
-            deserialization_schema = self._get_java_field(deserialization_schema_wrapper,
-                                                          'deserializationSchema')
+            deserialization_schema = get_field_value(deserialization_schema_wrapper,
+                                                     'deserializationSchema')
             self.assertEqual(deserialization_schema.getClass().getCanonicalName(),
                              class_name)
 
@@ -285,17 +290,17 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             field_name = 'startingOffsetsInitializer'
         else:
             field_name = 'stoppingOffsetsInitializer'
-        offsets_initializer = self._get_java_field(source.get_java_function(), field_name)
+        offsets_initializer = get_field_value(source.get_java_function(), field_name)
         self.assertEqual(
             offsets_initializer.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.initializer'
             '.ReaderHandledOffsetsInitializer'
         )
 
-        starting_offset = self._get_java_field(offsets_initializer, 'startingOffset')
+        starting_offset = get_field_value(offsets_initializer, 'startingOffset')
         self.assertEqual(starting_offset, offset)
 
-        offset_reset_strategy = self._get_java_field(offsets_initializer, 'offsetResetStrategy')
+        offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy')
         self.assertTrue(
             offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
         )
@@ -308,14 +313,14 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             field_name = 'startingOffsetsInitializer'
         else:
             field_name = 'stoppingOffsetsInitializer'
-        offsets_initializer = self._get_java_field(source.get_java_function(), field_name)
+        offsets_initializer = get_field_value(source.get_java_function(), field_name)
         self.assertEqual(
             offsets_initializer.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.initializer'
             '.TimestampOffsetsInitializer'
         )
 
-        starting_timestamp = self._get_java_field(offsets_initializer, 'startingTimestamp')
+        starting_timestamp = get_field_value(offsets_initializer, 'startingTimestamp')
         self.assertEqual(starting_timestamp, timestamp)
 
     def _check_specified_offsets_initializer(self,
@@ -327,14 +332,14 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             field_name = 'startingOffsetsInitializer'
         else:
             field_name = 'stoppingOffsetsInitializer'
-        offsets_initializer = self._get_java_field(source.get_java_function(), field_name)
+        offsets_initializer = get_field_value(source.get_java_function(), field_name)
         self.assertEqual(
             offsets_initializer.getClass().getCanonicalName(),
             'org.apache.flink.connector.kafka.source.enumerator.initializer'
             '.SpecifiedOffsetsInitializer'
         )
 
-        initial_offsets = self._get_java_field(offsets_initializer, 'initialOffsets')
+        initial_offsets = get_field_value(offsets_initializer, 'initialOffsets')
         self.assertTrue(is_instance_of(initial_offsets, get_gateway().jvm.java.util.Map))
         self.assertEqual(initial_offsets.size(), len(offsets))
         for j_topic_partition in initial_offsets:
@@ -343,7 +348,7 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
             self.assertIsNotNone(offsets.get(topic_partition))
             self.assertEqual(initial_offsets[j_topic_partition], offsets[topic_partition])
 
-        offset_reset_strategy = self._get_java_field(offsets_initializer, 'offsetResetStrategy')
+        offset_reset_strategy = get_field_value(offsets_initializer, 'offsetResetStrategy')
         self.assertTrue(
             offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
         )
@@ -359,8 +364,232 @@ class KafkaSourceTest(PyFlinkStreamingTestCase):
         j_configuration = j_to_configuration.invoke(j_source, to_jarray(jvm.java.lang.Object, []))
         return Configuration(j_configuration=j_configuration)
 
+
+class KafkaSinkTests(PyFlinkStreamingTestCase):
+
+    def test_compile(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+
+        ds = self.env.from_collection([], type_info=Types.STRING())
+        ds.sink_to(sink)
+
+        plan = json.loads(self.env.get_execution_plan())
+        self.assertEqual(plan['nodes'][1]['type'], 'Sink: Writer')
+        self.assertEqual(plan['nodes'][2]['type'], 'Sink: Committer')
+
+    def test_set_bootstrap_severs(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092,localhost:9093') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        config = get_field_value(sink.get_java_function(), 'kafkaProducerConfig')
+        self.assertEqual(config.get('bootstrap.servers'), 'localhost:9092,localhost:9093')
+
+    def test_set_delivery_guarantee(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'none')
+
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'at-least-once')
+
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_delivery_guarantee(DeliveryGuarantee.EXACTLY_ONCE) \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        guarantee = get_field_value(sink.get_java_function(), 'deliveryGuarantee')
+        self.assertEqual(guarantee.toString(), 'exactly-once')
+
+    def test_set_transactional_id_prefix(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_transactional_id_prefix('test-prefix') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        prefix = get_field_value(sink.get_java_function(), 'transactionalIdPrefix')
+        self.assertEqual(prefix, 'test-prefix')
+
+    def test_set_property(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .set_property('test-key', 'test-value') \
+            .build()
+        config = get_field_value(sink.get_java_function(), 'kafkaProducerConfig')
+        self.assertEqual(config.get('test-key'), 'test-value')
+
+    def test_set_record_serializer(self):
+        sink = KafkaSink.builder() \
+            .set_bootstrap_servers('localhost:9092') \
+            .set_record_serializer(self._build_serialization_schema()) \
+            .build()
+        serializer = get_field_value(sink.get_java_function(), 'recordSerializer')
+        self.assertEqual(serializer.getClass().getCanonicalName(),
+                         'org.apache.flink.connector.kafka.sink.'
+                         'KafkaRecordSerializationSchemaBuilder.'
+                         'KafkaRecordSerializationSchemaWrapper')
+        topic_selector = get_field_value(serializer, 'topicSelector')
+        self.assertEqual(topic_selector.apply(None), 'test-topic')
+        value_serializer = get_field_value(serializer, 'valueSerializationSchema')
+        self.assertEqual(value_serializer.getClass().getCanonicalName(),
+                         'org.apache.flink.api.common.serialization.SimpleStringSchema')
+
+    @staticmethod
+    def _build_serialization_schema() -> KafkaRecordSerializationSchema:
+        return KafkaRecordSerializationSchema.builder() \
+            .set_topic('test-topic') \
+            .set_value_serialization_schema(SimpleStringSchema()) \
+            .build()
+
+
+class KafkaRecordSerializationSchemaTests(PyFlinkTestCase):
+
+    def test_set_topic(self):
+        input_type = Types.ROW([Types.STRING()])
+
+        serialization_schema = KafkaRecordSerializationSchema.builder() \
+            .set_topic('test-topic') \
+            .set_value_serialization_schema(
+                JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \
+            .build()
+
+        j_record = serialization_schema._j_serialization_schema.serialize(
+            to_java_data_structure(Row('test')), None, None
+        )
+        self.assertEqual(j_record.topic(), 'test-topic')
+        self.assertIsNone(j_record.key())
+        self.assertEqual(j_record.value(), b'{"f0":"test"}')
+
+    def test_set_topic_selector(self):
+        def _select(data):
+            data = data[0]
+            if data == 'a':
+                return 'topic-a'
+            elif data == 'b':
+                return 'topic-b'
+            else:
+                return 'topic-dead-letter'
+
+        def _check_record(data, topic, serialized_data):
+            input_type = Types.ROW([Types.STRING()])
+
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic_selector(_select) \
+                .set_value_serialization_schema(
+                    JsonRowSerializationSchema.builder().with_type_info(input_type).build()) \
+                .build()
+            sink = KafkaSink.builder() \
+                .set_bootstrap_servers('localhost:9092') \
+                .set_record_serializer(serialization_schema) \
+                .build()
+
+            ds = MockDataStream(Types.ROW([Types.STRING()]))
+            ds.sink_to(sink)
+            row = Row(data)
+            topic_row = ds.feed(row)  # type: Row
+            j_record = serialization_schema._j_serialization_schema.serialize(
+                to_java_data_structure(topic_row), None, None
+            )
+            self.assertEqual(j_record.topic(), topic)
+            self.assertIsNone(j_record.key())
+            self.assertEqual(j_record.value(), serialized_data)
+
+        _check_record('a', 'topic-a', b'{"f0":"a"}')
+        _check_record('b', 'topic-b', b'{"f0":"b"}')
+        _check_record('c', 'topic-dead-letter', b'{"f0":"c"}')
+        _check_record('d', 'topic-dead-letter', b'{"f0":"d"}')
+
+    def test_set_key_serialization_schema(self):
+        def _check_key_serialization_schema(key_serialization_schema, expected_class):
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic('test-topic') \
+                .set_key_serialization_schema(key_serialization_schema) \
+                .set_value_serialization_schema(SimpleStringSchema()) \
+                .build()
+            schema_field = get_field_value(serialization_schema._j_serialization_schema,
+                                           'keySerializationSchema')
+            self.assertIsNotNone(schema_field)
+            self.assertEqual(schema_field.getClass().getCanonicalName(), expected_class)
+
+        self._check_serialization_schema_implementations(_check_key_serialization_schema)
+
+    def test_set_value_serialization_schema(self):
+        def _check_value_serialization_schema(value_serialization_schema, expected_class):
+            serialization_schema = KafkaRecordSerializationSchema.builder() \
+                .set_topic('test-topic') \
+                .set_value_serialization_schema(value_serialization_schema) \
+                .build()
+            schema_field = get_field_value(serialization_schema._j_serialization_schema,
+                                           'valueSerializationSchema')
+            self.assertIsNotNone(schema_field)
+            self.assertEqual(schema_field.getClass().getCanonicalName(), expected_class)
+
+        self._check_serialization_schema_implementations(_check_value_serialization_schema)
+
     @staticmethod
-    def _get_java_field(java_object, field_name: str):
-        j_field = java_object.getClass().getDeclaredField(field_name)
-        j_field.setAccessible(True)
-        return j_field.get(java_object)
+    def _check_serialization_schema_implementations(check_function):
+        input_type = Types.ROW([Types.STRING()])
+
+        check_function(
+            JsonRowSerializationSchema.builder().with_type_info(input_type).build(),
+            'org.apache.flink.formats.json.JsonRowSerializationSchema'
+        )
+        check_function(
+            CsvRowSerializationSchema.Builder(input_type).build(),
+            'org.apache.flink.formats.csv.CsvRowSerializationSchema'
+        )
+        avro_schema_string = """
+        {
+            "type": "record",
+            "name": "test_record",
+            "fields": []
+        }
+        """
+        check_function(
+            AvroRowSerializationSchema(avro_schema_string=avro_schema_string),
+            'org.apache.flink.formats.avro.AvroRowSerializationSchema'
+        )
+        check_function(
+            SimpleStringSchema(),
+            'org.apache.flink.api.common.serialization.SimpleStringSchema'
+        )
+
+
+class MockDataStream(data_stream.DataStream):
+
+    def __init__(self, original_type=None):
+        super().__init__(None)
+        self._operators = []
+        self._type = original_type
+
+    def feed(self, data):
+        for op in self._operators:
+            data = op(data)
+        return data
+
+    def get_type(self):
+        return self._type
+
+    def map(self, f, output_type=None):
+        self._operators.append(f)
+        self._type = output_type
+
+    def sink_to(self, sink):
+        ds = self
+        from pyflink.datastream.connectors.base import SupportPreprocessing
+        if isinstance(sink, SupportPreprocessing):
+            if sink.need_preprocessing():
+                ds = sink.get_preprocessing().apply(self)
+        return ds
diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py
index 1d6e89d8588..15418960590 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -828,7 +828,15 @@ class DataStream(object):
         :param sink: The user defined sink.
         :return: The closed DataStream.
         """
-        return DataStreamSink(self._j_data_stream.sinkTo(sink.get_java_function()))
+        ds = self
+
+        from pyflink.datastream.connectors.base import SupportPreprocessing
+        if isinstance(sink, SupportPreprocessing):
+            preprocessing_sink = cast(SupportPreprocessing, sink)
+            if preprocessing_sink.need_preprocessing():
+                ds = preprocessing_sink.get_preprocessing().apply(self)
+
+        return DataStreamSink(ds._j_data_stream.sinkTo(sink.get_java_function()))
 
     def execute_and_collect(self, job_execution_name: str = None, limit: int = None) \
             -> Union['CloseableIterator', list]:
diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java
new file mode 100644
index 00000000000..63ac0bac890
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnTopicSelector(Class<T> clazz) {
+        return (T)
+                Proxy.newProxyInstance(
+                        clazz.getClassLoader(),
+                        new Class[] {clazz},
+                        new FirstColumnTopicSelectorInvocationHandler());
+    }
+
+    /** The serializable {@link InvocationHandler} as the proxy for first column selector. */
+    public static class FirstColumnTopicSelectorInvocationHandler
+            implements InvocationHandler, Serializable {
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+            Preconditions.checkArgument(method.getName().equals("apply"));
+            Preconditions.checkArgument(args.length == 1);
+            Preconditions.checkArgument(args[0] instanceof Row);
+            Row row = (Row) args[0];
+            Preconditions.checkArgument(row.getArity() >= 1);
+            return row.getField(0);
+        }
+    }
+
+    /**
+     * A {@link SerializationSchema} for {@link Row} that only serialize the second column using a
+     * wrapped {@link SerializationSchema} for {@link T}.
+     *
+     * @param <T> The actual data type wrapped in the Row.
+     */
+    public static class SecondColumnSerializationSchema<T> implements SerializationSchema<Row> {
+
+        private static final long serialVersionUID = 1L;
+
+        private final SerializationSchema<T> wrappedSchema;
+
+        /**
+         * The constructor.
+         *
+         * @param wrappedSchema The {@link SerializationSchema} to serialize {@link T} objects.
+         */
+        public SecondColumnSerializationSchema(SerializationSchema<T> wrappedSchema) {
+            this.wrappedSchema = wrappedSchema;
+        }
+
+        @Override
+        public void open(InitializationContext context) throws Exception {
+            wrappedSchema.open(context);
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public byte[] serialize(Row row) {
+            Preconditions.checkArgument(row.getArity() >= 2);
+            return wrappedSchema.serialize((T) row.getField(1));
+        }
+    }
+}