You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/10 03:59:33 UTC

[flink-connector-pulsar] branch main updated: [FLINK-28082][Connector/Pulsar] Add end-to-end encryption support for Pulsar connector. (#13)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 5173e70  [FLINK-28082][Connector/Pulsar] Add end-to-end encryption support for Pulsar connector. (#13)
5173e70 is described below

commit 5173e708a9691754dd32000a77a8059cdbb01b2b
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Jan 10 11:59:28 2023 +0800

    [FLINK-28082][Connector/Pulsar] Add end-to-end encryption support for Pulsar connector. (#13)
---
 .../docs/connectors/datastream/pulsar.md           | 393 +++++++++++-------
 docs/content/docs/connectors/datastream/pulsar.md  | 461 +++++++++++++--------
 .../generated/pulsar_consumer_configuration.html   |   6 +-
 .../generated/pulsar_producer_configuration.html   |   6 +
 .../generated/pulsar_source_configuration.html     |   2 +-
 .../flink/tests/util/pulsar/PulsarSinkE2ECase.java |   4 +-
 .../FlinkContainerWithPulsarEnvironment.java       |   7 +-
 .../pulsar/common/crypto/DefaultPulsarCrypto.java  | 107 +++++
 .../pulsar/common/crypto/PulsarCrypto.java         |  86 ++++
 .../pulsar/common/crypto/PulsarCryptoDisabled.java |  41 ++
 .../flink/connector/pulsar/sink/PulsarSink.java    |  18 +-
 .../connector/pulsar/sink/PulsarSinkBuilder.java   |  40 +-
 .../connector/pulsar/sink/PulsarSinkOptions.java   |  67 +--
 .../pulsar/sink/config/PulsarSinkConfigUtils.java  |  14 +-
 .../connector/pulsar/sink/writer/PulsarWriter.java |   8 +-
 .../sink/writer/topic/TopicProducerRegister.java   |  64 ++-
 .../connector/pulsar/source/PulsarSource.java      |  12 +-
 .../pulsar/source/PulsarSourceBuilder.java         |  31 +-
 .../pulsar/source/PulsarSourceOptions.java         |  75 +---
 .../source/reader/PulsarPartitionSplitReader.java  |  20 +
 .../pulsar/source/reader/PulsarSourceReader.java   |   3 +
 .../connector/pulsar/sink/PulsarSinkITCase.java    |   9 +-
 .../pulsar/sink/writer/PulsarWriterTest.java       |  10 +-
 .../writer/topic/TopicProducerRegisterTest.java    |   7 +-
 .../pulsar/source/PulsarSourceITCase.java          |   5 +
 .../source/enumerator/cursor/StopCursorTest.java   |   2 +
 .../reader/PulsarPartitionSplitReaderTest.java     |   2 +
 .../source/reader/PulsarSourceReaderTest.java      |   4 +-
 .../pulsar/testutils/PulsarTestCommonUtils.java    |   5 +
 .../pulsar/testutils/PulsarTestKeyReader.java      | 124 ++++++
 .../sink/cases/PulsarEncryptSinkContext.java       |  73 ++++
 .../sink/{ => cases}/PulsarSinkTestContext.java    |  37 +-
 .../sink/reader/PulsarEncryptDataReader.java       |  36 ++
 .../{ => reader}/PulsarPartitionDataReader.java    |  42 +-
 .../testutils/source/PulsarSourceTestContext.java  |   1 +
 .../cases/ConsumeEncryptMessagesContext.java       |  76 ++++
 .../source/cases/PartialKeysConsumingContext.java  |   2 +-
 .../KeyedPulsarPartitionDataWriter.java            |   2 +-
 .../source/writer/PulsarEncryptDataWriter.java     |  85 ++++
 .../{ => writer}/PulsarPartitionDataWriter.java    |   2 +-
 pom.xml                                            |  25 +-
 41 files changed, 1450 insertions(+), 564 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 31131ab..d7b7fcf 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -28,9 +28,7 @@ Flink 当前提供 [Apache Pulsar](https://pulsar.apache.org) Source 和 Sink 
 
 ## 添加依赖
 
-Pulsar Source 当前支持 Pulsar 2.8.1 之后的版本,但是 Pulsar Source 使用到了 Pulsar 的[事务机制](https://pulsar.apache.org/docs/zh-CN/txn-what/),建议在 Pulsar 2.9.2 及其之后的版本上使用 Pulsar Source 进行数据读取。
-
-如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
+当前支持 Pulsar 2.10.0 及其之后的版本,建议在总是将 Pulsar 升级至最新版。如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
 
 {{< artifact flink-connector-pulsar >}}
 
@@ -190,7 +188,7 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
 
 如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 `PulsarDeserializationSchema`。Pulsar Source里面提供了 3 种预定义的反序列化器。
 
-- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 解析消息。如果使用 KeyValue 或者 Struct 类型的 Schema, 那么 Pulsar 的 `Schema` 将不会含有类型类信息, 但 `PulsarSchemaTypeInformation` 需要通过传入类型类信息来构造。因此我们提供的 API 支持用户传入类型信息。
+- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/2.10.x/schema-understand/) 解析消息。如果使用 KeyValue 或者 Struct 类型的 Schema, 那么 Pulsar 的 `Schema` 将不会含有类型类信息, 但 `PulsarSchemaTypeInformation` 需要通过传入类型类信息来构造。因此我们提供的 API 支持用户传入类型信息。
   ```java
   // 基础数据类型
   PulsarDeserializationSchema.pulsarSchema(Schema);
@@ -229,10 +227,24 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
   {{< /tab >}}
   {{< /tabs >}}
 
-Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。例如,消息的 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
+Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#messages)。例如,消息的 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
 
 如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。并一定要确保 `PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 是正确的结果。Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
 
+同时使用 `PulsarDeserializationSchema.pulsarSchema()` 以及在 builder 中指定 `PulsarSourceBuilder.enableSchemaEvolution()` 可以启用 [Schema evolution][schema-evolution] 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+
+PulsarSource<SomePojo> source = PulsarSource.builder()
+    ...
+    .setDeserializationSchema(schema, SomePojo.class)
+    .enableSchemaEvolution()
+    .build();
+```
+
+如果使用 Pulsar 原生的 Schema 来反序列化消息却不启用 Schema Evolution 特性,我们将会跳过 Schema 兼容性检查,解析一些消息时可能会遇到未知的错误。
+
 ### 定义 RangeGenerator
 
 如果想在 Pulsar Source 里面使用 `Key_Shared` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
@@ -398,7 +410,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   {{< /tabs >}}
 
 - 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
-  {{< tabs "pulsar-boundedness-at-event-time" >}} 
+  {{< tabs "pulsar-boundedness-at-event-time" >}}
   {{< tab "Java" >}}
   ```java
   StopCursor.atEventTime(long);
@@ -459,13 +471,13 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
 
 #### Pulsar Java 客户端配置项
 
-Pulsar Source 使用 [Java 客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/)来创建消费实例,相关的配置定义于 Pulsar 的 `ClientConfigurationData` 内。在 `PulsarOptions` 选项中,定义大部分的可供用户定义的配置。
+Pulsar Source 使用 [Java 客户端](https://pulsar.apache.org/docs/2.10.x/client-libraries-java/)来创建消费实例,相关的配置定义于 Pulsar 的 `ClientConfigurationData` 内。在 `PulsarOptions` 选项中,定义大部分的可供用户定义的配置。
 
 {{< generated/pulsar_client_configuration >}}
 
 #### Pulsar 管理 API 配置项
 
-[管理 API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/) 用于查询 Topic 的元数据和用正则订阅的时候的 Topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,`PulsarOptions` 包含了这些配置 。
+[管理 API](https://pulsar.apache.org/docs/2.10.x/admin-api-overview/) 用于查询 Topic 的元数据和用正则订阅的时候的 Topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,`PulsarOptions` 包含了这些配置 。
 
 {{< generated/pulsar_admin_configuration >}}
 
@@ -535,32 +547,14 @@ env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With Cu
 
 ### 消息确认
 
-一旦在 Topic 上创建了订阅,消息便会[存储](https://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/#%E6%8C%81%E4%B9%85%E5%8C%96%E5%AD%98%E5%82%A8)在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当 Pulsar Source 同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。Pulsar Source 支持四种订阅方式,它们的消息确认方式也大不相同。
+一旦在 Topic 上创建了订阅,消息便会[存储](https://pulsar.apache.org/docs/2.10.x/concepts-architecture-overview/#persistent-storage)在 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当 Flink 同 Pulsar 确认此条消息已经被消费,该消息才以某种机制会被移除。
 
-#### 独占和灾备订阅下的消息确认
-
-`独占` 和 `灾备` 订阅下,Pulsar Source 使用累进式确认方式。确认某条消息已经被处理时,其前面消息会自动被置为已读。Pulsar Source 会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
+我们使用 `独占` 作为默认的订阅模式。此订阅下,Pulsar Source 使用累进式确认方式。确认某条消息已经被处理时,其前面消息会自动被置为已读。Pulsar Source 会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
 
 如果用户没有在 Flink 上启用检查点,Pulsar Source 可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 `PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` 来进行定义。
 
 需要注意的是,此种场景下,Pulsar Source 并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。
 
-#### 共享和 key 共享订阅下的消息确认
-
-`共享` 和 `key 共享` 需要依次确认每一条消息,所以 Pulsar Source 在 Pulsar 事务里面进行消息确认,然后将事务提交到 Pulsar。
-
-首先需要在 Pulsar 的 `borker.conf` 文件里面启用事务:
-
-```text
-transactionCoordinatorEnabled=true
-```
-
-Pulsar Source 创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` 来设置事务的超时时间。
-
-如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` 选项设置为 `true`,消息从 Pulsar 消费后会被立刻置为已读。Pulsar Source 无法保证此种场景下的消息一致性。
-
-Pulsar Source 在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。
-
 ## Pulsar Sink
 
 Pulsar Sink 连接器可以将经过 Flink 处理后的数据写入一个或多个 Pulsar Topic 或者 Topic 下的某些分区。
@@ -568,7 +562,7 @@ Pulsar Sink 连接器可以将经过 Flink 处理后的数据写入一个或多
 {{< hint info >}}
 Pulsar Sink 基于 Flink 最新的 [Sink API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) 实现。
 
-如果想要使用旧版的使用 `SinkFuntion` 接口实现的 Sink 连接器,可以使用 StreamNative 维护的 [pulsar-flink](https://github.com/streamnative/pulsar-flink)。
+如果想要使用旧版的使用 `SinkFunction` 接口实现的 Sink 连接器,可以使用 StreamNative 维护的 [pulsar-flink](https://github.com/streamnative/pulsar-flink)。
 {{< /hint >}}
 
 ### 使用示例
@@ -577,7 +571,7 @@ Pulsar Sink 使用 builder 类来创建 `PulsarSink` 实例。
 
 下面示例展示了如何通过 Pulsar Sink 以“至少一次”的语义将字符串类型的数据发送给 topic1。
 
-{{< tabs "46e225b1-1e34-4ff3-890c-aa06a2b99c0a" >}}
+{{< tabs "pulsar-sink-example" >}}
 {{< tab "Java" >}}
 
 ```java
@@ -618,7 +612,7 @@ stream.sink_to(pulsar_sink)
 
 - Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
 - Pulsar HTTP 管理地址,使用 `setAdminUrl(String)` 方法提供。
-- 需要发送到的 Topic 或者是 Topic 下面的分区,详见[指定写入的topic或者topic分区](#指定写入的topic或者topic分区)。
+- 需要发送到的 Topic 或者是 Topic 下面的分区,详见[指定写入的 Topic 或者 Topic 分区](#指定写入的-topic-或者-topic-分区)。
 - 编码 Pulsar 消息的序列化器,详见[序列化器](#序列化器)。
 
 在创建 `PulsarSink` 时,建议使用 `setProducerName(String)` 来指定 `PulsarSink` 内部使用的 Pulsar 生产者名称。这样方便在数据监控页面找到对应的生产者监控指标。
@@ -627,7 +621,7 @@ stream.sink_to(pulsar_sink)
 
 `PulsarSink` 指定写入 Topic 的方式和 Pulsar Source [指定消费的 Topic 或者 Topic 分区](#指定消费的-topic-或者-topic-分区)的方式类似。`PulsarSink` 支持以 mixin 风格指定写入的 Topic 或分区。因此,可以指定一组 Topic 或者分区或者是两者都有。
 
-{{< tabs "3d452e6b-bffd-42f7-bb91-974b306262ca" >}}
+{{< tabs "set-pulsar-sink-topics" >}}
 {{< tab "Java" >}}
 
 ```java
@@ -672,9 +666,9 @@ PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "
 
 序列化器(`PulsarSerializationSchema`)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 `SerializationSchema` 接口实现序列化器和使用 Pulsar 原生的 `Schema` 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 `Schema.AUTO_PRODUCE_BYTES()`。
 
-如果不需要指定 [Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) 接口中提供的 key 或者其他的消息属性,可以从上述 2 种预定义的 `PulsarSerializationSchema` 实现中选择适合需求的一种使用。
+如果不需要指定 [Message](https://pulsar.apache.org/api/client/2.10.x/org/apache/pulsar/client/api/Message.html) 接口中提供的 key 或者其他的消息属性,可以从上述 2 种预定义的 `PulsarSerializationSchema` 实现中选择适合需求的一种使用。
 
-- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 来序列化 Flink 中的数据。
+- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/2.10.x/schema-understand/) 来序列化 Flink 中的数据。
   ```java
   // 原始数据类型
   PulsarSerializationSchema.pulsarSchema(Schema)
@@ -687,7 +681,7 @@ PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "
   ```
 - 使用 Flink 的 `SerializationSchema` 来序列化数据。
 
-  {{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d8" >}}
+  {{< tabs "set-pulsar-serialization-flink-schema" >}}
   {{< tab "Java" >}}
 
   ```java
@@ -704,7 +698,7 @@ PulsarSink.builder().set_topics(["topic-a-partition-0", "topic-a-partition-2", "
   {{< /tab >}}
   {{< /tabs >}}
 
-同时使用 `PulsarSerializationSchema.pulsarSchema()` 以及在 builder 中指定 `PulsarSinkBuilder.enableSchemaEvolution()` 可以启用 [Schema evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/#schema-evolution) 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。
+同时使用 `PulsarSerializationSchema.pulsarSchema()` 以及在 builder 中指定 `PulsarSinkBuilder.enableSchemaEvolution()` 可以启用 [Schema evolution][schema-evolution] 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema Evolution。
 
 ```java
 Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
@@ -736,7 +730,7 @@ PulsarSink<String> sink = PulsarSink.builder()
   可以使用 `MessageKeyHash.JAVA_HASH` 或者 `MessageKeyHash.MURMUR3_32_HASH` 两种不同的哈希算法来计算消息 key 的哈希值。使用 `PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH` 配置项来指定想要的哈希算法。
 
 - `RoundRobinRouter`:轮换使用用户给定的 Topic 分区。
-  
+
   消息将会轮替地选取 Topic 分区,当往某个 Topic 分区里写入指定数量的消息后,将会轮换至下一个 Topic 分区。使用 `PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES` 指定向一个 Topic 分区中写入的消息数量。
 
 还可以通过实现 `TopicRouter` 接口来自定义消息路由策略,请注意 TopicRouter 的实现需要能被序列化。
@@ -758,7 +752,7 @@ public interface TopicRouter<IN> extends Serializable {
 {{< hint info >}}
 如前文所述,Pulsar 分区的内部被实现为一个无分区的 Topic,一般情况下 Pulsar 客户端会隐藏这个实现,并且提供内置的消息路由策略。Pulsar Sink 并没有使用 Pulsar 客户端提供的路由策略和封装,而是使用了 Pulsar 客户端更底层的 API 自行实现了消息路由逻辑。这样做的主要目的是能够在属于不同 Topic 的分区之间定义更灵活的消息路由策略。
 
-详情请参考 Pulsar 的 [partitioned topics](https://pulsar.apache.org/docs/zh-CN/cookbooks-partitioned/)。
+详情请参考 Pulsar 的 [partitioned topics](https://pulsar.apache.org/docs/2.10.x/cookbooks-partitioned/) 文档。
 {{< /hint >}}
 
 ### 发送一致性
@@ -767,11 +761,11 @@ public interface TopicRouter<IN> extends Serializable {
 
 - `NONE`:Flink 应用运行时可能出现数据丢失的情况。在这种模式下,Pulsar Sink 发送消息后并不会检查消息是否发送成功。此模式具有最高的吞吐量,可用于一致性没有要求的场景。
 - `AT_LEAST_ONCE`:每条消息**至少有**一条对应消息发送至 Pulsar,发送至 Pulsar 的消息可能会因为 Flink 应用重启而出现重复。
-- `EXACTLY_ONCE`:每条消息**有且仅有**一条对应消息发送至 Pulsar。发送至 Pulsar 的消息不会有重复也不会丢失。Pulsar Sink 内部依赖 [Pulsar 事务](https://pulsar.apache.org/docs/zh-CN/transactions/)和两阶段提交协议来保证每条记录都能正确发往 Pulsar。
+- `EXACTLY_ONCE`:每条消息**有且仅有**一条对应消息发送至 Pulsar。发送至 Pulsar 的消息不会有重复也不会丢失。Pulsar Sink 内部依赖 [Pulsar 事务](https://pulsar.apache.org/docs/2.10.x/transactions/)和两阶段提交协议来保证每条记录都能正确发往 Pulsar。
 
 ### 消息延时发送
 
-[消息延时发送](https://pulsar.apache.org/docs/zh-CN/next/concepts-messaging/#%E6%B6%88%E6%81%AF%E5%BB%B6%E8%BF%9F%E4%BC%A0%E9%80%92)特性可以让指定发送的每一条消息需要延时一段时间后才能被下游的消费者所消费。当延时消息发送特性启用时,Pulsar Sink 会**立刻**将消息发送至 Pulsar Broker。但该消息在指定的延迟时间到达前将会保持对下游消费者不可见。
+[消息延时发送](https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#delayed-message-delivery)特性可以让指定发送的每一条消息需要延时一段时间后才能被下游的消费者所消费。当延时消息发送特性启用时,Pulsar Sink 会**立刻**将消息发送至 Pulsar Broker。但该消息在指定的延迟时间到达前将会保持对下游消费者不可见。
 
 消息延时发送仅在 `Shared` 订阅模式下有效,在 `Exclusive` 和 `Failover` 模式下该特性无效。
 
@@ -790,7 +784,7 @@ public interface TopicRouter<IN> extends Serializable {
 Pulsar Sink 和 Pulsar Source 公用的配置选项可参考
 
 - [Pulsar Java 客户端配置项](#pulsar-java-客户端配置项)
-- [Pulsar 管理 API 配置项](#pulsar-管理-API-配置项)
+- [Pulsar 管理 API 配置项](#pulsar-管理-api-配置项)
 
 #### Pulsar 生产者 API 配置项
 
@@ -804,113 +798,25 @@ Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 `ProducerConfigurat
 
 {{< generated/pulsar_sink_configuration >}}
 
-### Sink 监控指标
+### 设计思想简述
 
-下列表格列出了当前 Sink 支持的监控指标,前 6 个指标是 [FLIP-33: Standardize Connector Metrics]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)) 中规定的 Sink 连接器应当支持的标准指标。
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 15%">Scope</th>
-      <th class="text-left" style="width: 18%">Metrics</th>
-      <th class="text-left" style="width: 18%">User Variables</th>
-      <th class="text-left" style="width: 39%">Description</th>
-      <th class="text-left" style="width: 10%">Type</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <th rowspan="13">Operator</th>
-        <td>numBytesOut</td>
-        <td>n/a</td>
-        <td>Pulsar Sink 启动后总共发出的字节数</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>numBytesOutPerSecond</td>
-        <td>n/a</td>
-        <td>每秒发送的字节数</td>
-        <td>Meter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOut</td>
-        <td>n/a</td>
-        <td>Pulsar Sink 启动后总共发出的消息数</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOutPerSecond</td>
-        <td>n/a</td>
-        <td>每秒发送的消息数</td>
-        <td>Meter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOutErrors</td>
-        <td>n/a</td>
-        <td>总共发送消息失败的次数</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>currentSendTime</td>
-        <td>n/a</td>
-        <td>最近一条消息从被放入客户端缓冲队列到收到消息确认的时间</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.numAcksReceived</td>
-        <td>n/a</td>
-        <td>总共收到的确认数</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.sendLatencyMax</td>
-        <td>n/a</td>
-        <td>所有生产者的最大发送延迟</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency50Pct</td>
-        <td>ProducerName</td>
-        <td>某个生产者在过去的一个窗口内的发送延迟的中位数</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency75Pct</td>
-        <td>ProducerName</td>
-        <td>某个生产者在过去的一个窗口内的发送延迟的 75 百分位数</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency95Pct</td>
-        <td>ProducerName</td>
-        <td>某个生产者在过去的一个窗口内的发送延迟的 95 百分位数</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency99Pct</td>
-        <td>ProducerName</td>
-        <td>某个生产者在过去的一个窗口内的发送延迟的 99 百分位数</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency999Pct</td>
-        <td>ProducerName</td>
-        <td>某个生产者在过去的一个窗口内的发送延迟的 99.9 百分位数</td>
-        <td>Gauge</td>
-    </tr>
-  </tbody>
-</table>
+Pulsar Sink 遵循 [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) 中定义的 Sink API 设计。
 
-{{< hint info >}}
-指标 `numBytesOut`、`numRecordsOut` 和 `numRecordsOutErrors` 从 Pulsar Producer 实例的监控指标中获得。
+#### 无状态的 SinkWriter
 
-`currentSendTime` 记录了最近一条消息从放入生产者的缓冲队列到消息被消费确认所耗费的时间。这项指标在 `NONE` 发送一致性下不可用。
-{{< /hint >}}
+在 `EXACTLY_ONCE` 一致性下,Pulsar Sink 不会将事务相关的信息存放于检查点快照中。这意味着当 Flink 应用重启时,Pulsar Sink 会创建新的事务实例。上一次运行过程中任何未提交事务中的消息会因为超时中止而无法被下游的消费者所消费。这样的设计保证了 SinkWriter 是无状态的。
+
+#### Pulsar Schema Evolution
+
+[Pulsar Schema Evolution][schema-evolution] 允许用户在一个 Flink 应用程序中使用的数据模型发生特定改变后(比如向基于 ARVO 的 POJO 类中增加或删除一个字段),仍能使用同一个 Flink 应用程序的代码。
 
-默认情况下,Pulsar 生产者每隔 60 秒才会刷新一次监控数据,然而 Pulsar Sink 每 500 毫秒就会从 Pulsar 生产者中获得最新的监控数据。因此 `numRecordsOut`、`numBytesOut`、`numAcksReceived` 以及 `numRecordsOutErrors` 4 个指标实际上每 60 秒才会刷新一次。
+可以在 Pulsar 集群内指定哪些类型的数据模型的改变是被允许的,详情请参阅 [Pulsar Schema Evolution][schema-evolution]。
 
-如果想要更高地刷新评率,可以通过如下方式来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s):
-{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d1" >}}
+## 监控指标
+
+默认情况下,Pulsar client 每隔 60 秒才会刷新一次监控数据。如果想要提高刷新频率,可以通过如下方式来将 Pulsar client 的监控数据刷新频率调整至相应值(最低为1s):
+
+{{< tabs "pulsar-stats-interval-seconds" >}}
 
 {{< tab "Java" >}}
 ```java
@@ -926,32 +832,208 @@ builder.set_config("pulsar.client.statsIntervalSeconds", "1")
 
 {{< /tabs >}}
 
-`numBytesOutRate` 和 `numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut` 和 `numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。
+### Source 监控指标
 
-### 设计思想简述
+在 [FLIP-33: Standardize Connector Metrics][standard-metrics] 定义的基础 Source 指标之上,我们额外提供了一些来自 Client 的监控指标。你需要启用 `pulsar.source.enableMetrics` 选项来获得这些监控指标,所有的指标列举在下面的表格中。
 
-Pulsar Sink 遵循 [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) 中定义的 Sink API 设计。
+{{< tabs "pulsar-enable-source-metrics" >}}
 
-#### 无状态的 SinkWriter
+{{< tab "Java" >}}
+```java
+builder.setConfig(PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS, true);
+```
+{{< /tab >}}
 
-在 `EXACTLY_ONCE` 一致性下,Pulsar Sink 不会将事务相关的信息存放于检查点快照中。这意味着当 Flink 应用重启时,Pulsar Sink 会创建新的事务实例。上一次运行过程中任何未提交事务中的消息会因为超时中止而无法被下游的消费者所消费。这样的设计保证了 SinkWriter 是无状态的。
+{{< tab "Python" >}}
+```python
+builder.set_config("pulsar.source.enableMetrics", "true")
+```
+{{< /tab >}}
 
-#### Pulsar Schema Evolution
+{{< /tabs >}}
 
-[Pulsar Schema Evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/) 允许用户在一个 Flink 应用程序中使用的数据模型发生特定改变后(比如向基于 ARVO 的 POJO 类中增加或删除一个字段),仍能使用同一个 Flink 应用程序的代码。
+| 指标                                                           | 变量                | 描述                                       | 类型  |
+| -------------------------------------------------------------- | ------------------- | ------------------------------------------ | ----- |
+| PulsarConsumer."Topic"."ConsumerName".numMsgsReceived          | Topic, ConsumerName | 在过去的一个统计窗口内消费的消息数         | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numBytesReceived         | Topic, ConsumerName | 在过去的一个统计窗口内消费的字节数         | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".rateMsgsReceived         | Topic, ConsumerName | 在过去的一个统计窗口内消费的消息速率       | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".rateBytesReceived        | Topic, ConsumerName | 在过去的一个统计窗口内消费的字节速率       | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numAcksSent              | Topic, ConsumerName | 在过去的一个统计窗口内确认消费成功的消息数 | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numAcksFailed            | Topic, ConsumerName | 在过去的一个统计窗口内确认消费失败的消息数 | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numReceiveFailed         | Topic, ConsumerName | 在过去的一个统计窗口内消费失败的消息数     | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numBatchReceiveFailed    | Topic, ConsumerName | 在过去的一个统计窗口内批量消费失败的消息数 | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalMsgsReceived        | Topic, ConsumerName | Consumer 消费的全部消息数                  | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalBytesReceived       | Topic, ConsumerName | Consumer 消费的全部字节数                  | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalReceivedFailed      | Topic, ConsumerName | Consumer 消费失败的消息数                  | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalBatchReceivedFailed | Topic, ConsumerName | Consumer 批量消费失败的消息数              | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalAcksSent            | Topic, ConsumerName | Consumer 确认消费成功的消息数              | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalAcksFailed          | Topic, ConsumerName | Consumer 确认消费失败的消息数              | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".msgNumInReceiverQueue    | Topic, ConsumerName | Consumer 当前待消费的消息队列大小          | Gauge |
 
-可以在 Pulsar 集群内指定哪些类型的数据模型的改变是被允许的,详情请参阅 [Pulsar Schema Evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/)。
+### Sink 监控指标
+
+下列表格列出了当前 Sink 支持的监控指标,前 6 个指标是 [FLIP-33: Standardize Connector Metrics][standard-metrics] 中规定的 Sink 连接器应当支持的标准指标。前 5 个指标会默认暴露给用户,其他指标需要通过启用 `pulsar.sink.enableMetrics` 选项来获得。
+
+{{< tabs "pulsar-enable-sink-metrics" >}}
+
+{{< tab "Java" >}}
+```java
+builder.setConfig(PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS, true);
+```
+{{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+builder.set_config("pulsar.sink.enableMetrics", "true")
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+| 指标                                                          | 变量                | 描述                                                   | 类型    |
+| ------------------------------------------------------------- | ------------------- | ------------------------------------------------------ | ------- |
+| numBytesOut                                                   | n/a                 | Pulsar Sink 启动后总共发出的字节数                     | Counter |
+| numBytesOutPerSecond                                          | n/a                 | 每秒发送的字节数                                       | Meter   |
+| numRecordsOut                                                 | n/a                 | Pulsar Sink 启动后总共发出的消息数                     | Counter |
+| numRecordsOutPerSecond                                        | n/a                 | 每秒发送的消息数                                       | Meter   |
+| numRecordsOutErrors                                           | n/a                 | 总共发送消息失败的次数                                 | Counter |
+| currentSendTime                                               | n/a                 | 最近一条消息从被放入客户端缓冲队列到收到消息确认的时间 | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numMsgsSent             | Topic, ProducerName | 在过去的一个统计窗口内发送的消息数                     | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numBytesSent            | Topic, ProducerName | 在过去的一个统计窗口内发送的字节数                     | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numSendFailed           | Topic, ProducerName | 在过去的一个统计窗口内发送失败的消息数                 | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numAcksReceived         | Topic, ProducerName | 在过去的一个统计窗口内总共收到的确认数                 | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendMsgsRate            | Topic, ProducerName | 在过去的一个统计窗口内发送的消息速率                   | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendBytesRate           | Topic, ProducerName | 在过去的一个统计窗口内发送的字节速率                   | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis50pct  | Topic, ProducerName | 在过去的一个统计窗口内的发送延迟的中位数               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis75pct  | Topic, ProducerName | 在过去的一个统计窗口内的发送延迟的 75 百分位数         | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis95pct  | Topic, ProducerName | 在过去的一个统计窗口内的发送延迟的 95 百分位数         | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis99pct  | Topic, ProducerName | 在过去的一个统计窗口内的发送延迟的 99 百分位数         | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis999pct | Topic, ProducerName | 在过去的一个统计窗口内的发送延迟的 99.9 百分位数       | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillisMax    | Topic, ProducerName | 在过去的一个统计窗口内的最大发送延迟                   | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalMsgsSent           | Topic, ProducerName | Producer 发送的全部消息数                              | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalBytesSent          | Topic, ProducerName | Producer 发送的全部字节数                              | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalSendFailed         | Topic, ProducerName | Producer 发送失败的消息数                              | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalAcksReceived       | Topic, ProducerName | Producer 确认发送成功的消息数                          | Gauge   |
+| PulsarProducer."Topic"."ProducerName".pendingQueueSize        | Topic, ProducerName | Producer 当前待发送的消息队列大小                      | Gauge   |
+
+{{< hint info >}}
+- 指标 `numBytesOut`、`numRecordsOut` 和 `numRecordsOutErrors` 从 Pulsar client 实例的监控指标中获得。
+
+- `numBytesOutRate` 和 `numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut` 和 `numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。
+
+- `currentSendTime` 记录了最近一条消息从放入生产者的缓冲队列到消息被消费确认所耗费的时间。这项指标在 `NONE` 发送一致性下不可用。
+{{< /hint >}}
+
+## 端到端加密
+
+Flink 可以使用 Pulsar 提供的加解密功能在 Source 和 Sink 端加解密消息。用户需要提供一个合法的密钥对(即一个公钥和一个私钥,也就是非对称加密方式)来实现端到端的加密。
+
+### 如何启用端到端加密
+
+1. 创建密钥对
+
+   Pulsar 同时支持 ECDSA 或者 RSA 密钥对,你可以同时创建多组不同类型的密钥对,加密消息时会选择其中任意一组密钥来确保消息更加安全。
+   ```shell
+   # ECDSA(仅用于 Java 端)
+   openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
+   openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
+
+   # RSA
+   openssl genrsa -out test_rsa_privkey.pem 2048
+   openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
+   ```
+
+2. 实现 `CryptoKeyReader` 接口
+
+   每个密钥对都需要有一个唯一的密钥名称,用户需要自行实现 `CryptoKeyReader` 接口并确保 `CryptoKeyReader.getPublicKey()` 和 `CryptoKeyReader.getPrivateKey()` 方法能基于给定的密钥名称反正正确的密钥。
+
+   Pulsar 提供了一个默认的 `CryptoKeyReader` 实现 `DefaultCryptoKeyReader`。用户需要使用对于的 builder 方法 `DefaultCryptoKeyReader.builder()` 来创建实例。需要注意的是,对应的密钥对文件需要放在 Flink 程序的运行环境上。
+
+   ```java
+   // defaultPublicKey 和 defaultPrivateKey 也需要提供。
+   // 文件 file:///path/to/default-public.key 需要在 Flink 的运行环境上存在。
+   CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
+       .defaultPublicKey("file:///path/to/default-public.key")
+       .defaultPrivateKey("file:///path/to/default-private.key")
+       .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
+       .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
+       .build();
+   ```
+
+3. (可选)实现 `MessageCrypto<MessageMetadata, MessageMetadata>` 接口
+
+
+   Pulsar 原生支持 **ECDSA**、**RSA** 等常见非对称加解密方法。通常情况下,你不需要实现此接口,除非你想使用一个私有的加解密方法。你可以参考 Pulsar 的默认实现 `MessageCryptoBc` 来实现 `MessageCrypto<MessageMetadata, MessageMetadata>` 接口。
+
+4. 创建 `PulsarCrypto` 实例
+
+   `PulsarCrypto` 用于提供所有必要的加解密信息,你可以使用对应的 builder 方法来创建实例。
+
+   ```java
+   CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
+       .defaultPublicKey("file:///path/to/public1.key")
+       .defaultPrivateKey("file:///path/to/private2.key")
+       .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
+       .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
+       .build();
+
+   // 此处只用于演示如何使用,实际上你不需要这么做。
+   SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>> cryptoSupplier = () -> new MessageCryptoBc();
+
+   PulsarCrypto pulsarCrypto = PulsarCrypto.builder()
+       .cryptoKeyReader(keyReader)
+       // 所有的密钥名称需要在此处给出。
+       .addEncryptKeys("key1", "key2")
+       // 一般情况下你不需要提供 MessageCrypto 实例。
+       .messageCrypto(cryptoSupplier)
+       .build()
+   ```
+
+### 在 Pulsar source 上解密消息
+
+基于前面的指导创建对应的 `PulsarCrypto` 实例,然后在 `PulsarSource.builder()` 的构造方法里面予以给定。你需要同时定义解密失败的行为,Pulsar 在 `ConsumerCryptoFailureAction` 给定了 3 种实现。
+
+- `ConsumerCryptoFailureAction.FAIL`: Flink 程序将抛出异常并退出。
+- `ConsumerCryptoFailureAction.DISCARD`: 解密失败的消息将被丢弃。
+- `ConsumerCryptoFailureAction.CONSUME`
+
+  解密失败的消息将以未解密的状态传递给后续的算子,你也可以在 `PulsarDeserializationSchema` 里手动对解密失败的消息进行再次解密。所有关于解密的上下文都定义在 `Message.getEncryptionCtx()` 内。
+
+```java
+PulsarCrypto pulsarCrypto = ...
+
+PulsarSource<String> sink = PulsarSource.builder()
+    ...
+    .setPulsarCrypto(pulsarCrypto, ConsumerCryptoFailureAction.FAIL)
+    .build();
+```
+
+### 在 Pulsar sink 上加密消息
+
+基于前面的指导创建对应的 `PulsarCrypto` 实例,然后在 `PulsarSink.builder()` 的构造方法里面予以给定。你需要同时定义加密失败的行为,Pulsar 在 `ProducerCryptoFailureAction` 给定了 2 种实现。
+
+- `ProducerCryptoFailureAction.FAIL`: Flink 程序将抛出异常并退出。
+- `ProducerCryptoFailureAction.SEND`: 消息将以未加密的形态发送。
+
+```java
+PulsarCrypto pulsarCrypto = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setPulsarCrypto(pulsarCrypto, ProducerCryptoFailureAction.FAIL)
+    .build();
+```
 
 ## 升级至最新的连接器
 
 常见的升级步骤,请参阅[升级应用程序和 Flink 版本]({{< ref "docs/ops/upgrading" >}})。Pulsar 连接器没有在 Flink 端存储消费的状态,所有的消费信息都推送到了 Pulsar。所以需要注意下面的事项:
 
-* 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
-* 使用最新版本的 Pulsar 客户端来消费消息。
+- 不要同时升级 Pulsar 连接器和 Pulsar 服务端的版本。
+- 使用最新版本的 Pulsar 客户端来消费消息。
 
 ## 问题诊断
 
-使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 [Java 客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/)和[管理 API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/) 而开发的。
+使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 [Java 客户端](https://pulsar.apache.org/api/client/2.10.x/)和[管理 API](https://pulsar.apache.org/api/admin/2.10.x/) 而开发的。
 
 用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。
 
@@ -972,3 +1054,6 @@ Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.
 您可以使用最新的`pulsar-client-all`分支来解决这个问题。
 
 {{< top >}}
+
+[schema-evolution]: https://pulsar.apache.org/docs/2.10.x/schema-evolution-compatibility/#schema-evolution
+[standard-metrics]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 6933227..e71b762 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -28,9 +28,8 @@ Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for readi
 
 ## Dependency
 
-You can use the connector with the Pulsar 2.8.1 or higher. Because the Pulsar connector supports
-Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is recommended to use the Pulsar 2.9.2 or higher.
-Details on Pulsar compatibility can be found in [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+You can use the connector with the Pulsar 2.10.0 or higher. It is recommended to always use the latest Pulsar version.
+The details on Pulsar compatibility can be found in [PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
 
 {{< artifact flink-connector-pulsar >}}
 
@@ -212,7 +211,7 @@ The `PulsarDeserializationSchema` defines how to deserialize a Pulsar `Message<b
 If only the raw payload of a message (message data in bytes) is needed,
 you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provides three implementation methods.
 
-- Decode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+- Decode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/2.10.x/schema-understand/).
   If using KeyValue type or Struct types, the pulsar `Schema` does not contain type class info. But it is
   still needed to construct `PulsarSchemaTypeInformation`. So we provide two more APIs to pass the type info.
   ```java
@@ -253,7 +252,7 @@ you can use the predefined `PulsarDeserializationSchema`. Pulsar connector provi
   {{< /tab >}}
   {{< /tabs >}}
 
-Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
+Pulsar `Message<byte[]>` contains some [extra properties](https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#messages),
 such as message key, message publish time, message time, and application-defined key/value pairs etc.
 These properties could be defined in the `Message<byte[]>` interface.
 
@@ -261,6 +260,22 @@ If you want to deserialize the Pulsar message by these properties, you need to i
 Ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` is correct.
 Flink uses this `TypeInformation` to pass the messages to downstream operators.
 
+[Schema evolution][schema-evolution] can be enabled by users using `PulsarDeserializationSchema.pulsarSchema()` and
+`PulsarSourceBuilder.enableSchemaEvolution()`. This means that any broker schema validation is in place.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+
+PulsarSource<SomePojo> source = PulsarSource.builder()
+    ...
+    .setDeserializationSchema(schema, SomePojo.class)
+    .enableSchemaEvolution()
+    .build();
+```
+
+If you use Pulsar schema without enabling schema evolution, we will bypass the schema check. This may cause some
+errors when you use a wrong schema to deserialize the messages.
+
 ### Define a RangeGenerator
 
 Ensure that you have provided a `RangeGenerator` implementation if you want to consume a subset of keys on the Pulsar connector.
@@ -391,10 +406,10 @@ you can create a `MessageId` by using `DefaultImplementation.newMessageId(long l
 The Pulsar source supports streaming and batch execution mode.
 By default, the `PulsarSource` is configured for unbounded data.
 
-For unbounded data the Pulsar source never stops until a Flink job is stopped or failed. 
+For unbounded data the Pulsar source never stops until a Flink job is stopped or failed.
 You can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar source to stop at a specific stop position.
 
-You can use `setBoundedStopCursor(StopCursor)` to specify a stop position for bounded data. 
+You can use `setBoundedStopCursor(StopCursor)` to specify a stop position for bounded data.
 
 Built-in stop cursors include:
 
@@ -456,7 +471,7 @@ Built-in stop cursors include:
 
 - Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
 given event time won't be included in the consuming result.
-  {{< tabs "pulsar-boundedness-at-event-time" >}} 
+  {{< tabs "pulsar-boundedness-at-event-time" >}}
   {{< tab "Java" >}}
   ```java
   StopCursor.atEventTime(long);
@@ -522,7 +537,7 @@ In addition to configuration options described above, you can set arbitrary opti
 
 #### PulsarClient Options
 
-The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+The Pulsar connector uses the [client API](https://pulsar.apache.org/docs/2.10.x/client-libraries-java/)
 to create the `Consumer` instance. The Pulsar connector extracts most parts of Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration options in `PulsarOptions`.
 
@@ -530,7 +545,7 @@ which is required for creating a `PulsarClient`, as Flink configuration options
 
 #### PulsarAdmin Options
 
-The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used for querying topic metadata
+The [admin API](https://pulsar.apache.org/docs/2.10.x/admin-api-overview/) is used for querying topic metadata
 and for discovering the desired topics when the Pulsar connector uses topic-pattern subscription.
 It shares most part of the configuration options with the client API.
 The configuration options listed here are only used in the admin API.
@@ -614,14 +629,11 @@ details about how to define a `WatermarkStrategy`.
 
 ### Message Acknowledgement
 
-When a subscription is created, Pulsar [retains](https://pulsar.apache.org/docs/en/concepts-architecture-overview/#persistent-storage) all messages, even if the consumer is disconnected.
-The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully.
-The Pulsar connector supports four subscription types, which makes the acknowledgement behaviors vary among different subscriptions.
+When a subscription is created, Pulsar [retains](https://pulsar.apache.org/docs/2.10.x/concepts-architecture-overview/#persistent-storage) all messages,
+even if the consumer is disconnected. The retained messages are discarded only when the connector acknowledges that all these messages are processed successfully.
 
-#### Acknowledgement on Exclusive and Failover Subscription Types
-
-`Exclusive` and `Failover` subscription types support cumulative acknowledgment. In these subscription types, Flink only needs to acknowledge
-the latest successfully consumed message. All the message before the given message are marked
+We use `Exclusive` subscription as the default subscription type. It supports cumulative acknowledgment. In this subscription type,
+Flink only needs to acknowledge the latest successfully consumed message. All the message before the given message are marked
 with a consumed status.
 
 The Pulsar source acknowledges the current consuming message when checkpoints are **completed**,
@@ -633,31 +645,6 @@ You can use the `PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` option
 Pulsar source does **NOT** rely on committed positions for fault tolerance.
 Acknowledging messages is only for exposing the progress of consumers and monitoring on these two subscription types.
 
-#### Acknowledgement on Shared and Key_Shared Subscription Types
-
-In `Shared` and `Key_Shared` subscription types, messages are acknowledged one by one. You can acknowledge
-a message in a transaction and commit it to Pulsar.
-
-You should enable transaction in the Pulsar `borker.conf` file when using these two subscription types in connector:
-
-```text
-transactionCoordinatorEnabled=true
-```
-
-The default timeout for Pulsar transactions is 3 hours.
-Make sure that that timeout is greater than checkpoint interval + maximum recovery time.
-A shorter checkpoint interval indicates a better consuming performance.
-You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option to change the transaction timeout.
-
-If checkpointing is disabled or you can not enable the transaction on Pulsar broker, you should set
-`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message is immediately acknowledged after consuming.
-No consistency guarantees can be made in this scenario.
-
-{{< hint info >}}
-All acknowledgements in a transaction are recorded in the Pulsar broker side.
-{{< /hint >}}
-
 ## Pulsar Sink
 
 The Pulsar Sink supports writing records into one or more Pulsar topics or a specified list of Pulsar partitions.
@@ -675,7 +662,7 @@ If you still want to use the legacy `SinkFunction` or on Flink 1.14 or previous
 The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
 This example writes a String record to a Pulsar topic with at-least-once delivery guarantee.
 
-{{< tabs "46e225b1-1e34-4ff3-890c-aa06a2b99c0a" >}}
+{{< tabs "pulsar-sink-example" >}}
 {{< tab "Java" >}}
 
 ```java
@@ -716,7 +703,7 @@ The following properties are **required** for building PulsarSink:
 
 - Pulsar service url, configured by `setServiceUrl(String)`
 - Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
-- Topics / partitions to write, see [writing targets](#writing-targets) for more details.
+- Topics / partitions to write, see [Producing to topics](#producing-to-topics) for more details.
 - Serializer to generate Pulsar messages, see [serializer](#serializer) for more details.
 
 It is recommended to set the producer name in Pulsar Source by `setProducerName(String)`.
@@ -729,7 +716,7 @@ Defining the topics for producing is similar to the [topic-partition subscriptio
 in the Pulsar source. We support a mix-in style of topic setting. You can provide a list of topics,
 partitions, or both of them.
 
-{{< tabs "3d452e6b-bffd-42f7-bb91-974b306262ca" >}}
+{{< tabs "set-pulsar-sink-topics" >}}
 {{< tab "Java" >}}
 
 ```java
@@ -780,10 +767,10 @@ Similar to `PulsarSource`, Pulsar sink supports both Flink's `SerializationSchem
 Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in the Pulsar sink.
 
 If you do not need the message key and other message properties in Pulsar's
-[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html) interface,
+[Message](https://pulsar.apache.org/api/client/2.10.x/org/apache/pulsar/client/api/Message.html) interface,
 you can use the predefined `PulsarSerializationSchema`. The Pulsar sink provides two implementation methods.
 
-- Encode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+- Encode the message by using Pulsar's [Schema](https://pulsar.apache.org/docs/2.10.x/schema-understand/).
   ```java
   // Primitive types
   PulsarSerializationSchema.pulsarSchema(Schema)
@@ -796,7 +783,7 @@ you can use the predefined `PulsarSerializationSchema`. The Pulsar sink provides
   ```
 - Encode the message by using Flink's `SerializationSchema`
 
-  {{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d8" >}}
+  {{< tabs "set-pulsar-serialization-flink-schema" >}}
   {{< tab "Java" >}}
 
   ```java
@@ -813,17 +800,15 @@ you can use the predefined `PulsarSerializationSchema`. The Pulsar sink provides
   {{< /tab >}}
   {{< /tabs >}}
 
-[Schema evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
-can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
+[Schema evolution][schema-evolution] can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
 `PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker schema validation is in place.
 
 ```java
 Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
-PulsarSerializationSchema<SomePojo> pulsarSchema = PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
 
-PulsarSink<String> sink = PulsarSink.builder()
+PulsarSink<SomePojo> sink = PulsarSink.builder()
     ...
-    .setSerializationSchema(pulsarSchema)
+    .setSerializationSchema(schema, SomePojo.class)
     .enableSchemaEvolution()
     .build();
 ```
@@ -832,7 +817,7 @@ PulsarSink<String> sink = PulsarSink.builder()
 If you use Pulsar schema without enabling schema evolution, the target topic will have a `Schema.BYTES` schema.
 Consumers will need to handle the deserialization (if needed) themselves.
 
-For example, if you set  `PulsarSerializationSchema.pulsarSchema(Schema.STRING)` without enabling schema evolution,
+For example, if you set `PulsarSerializationSchema.pulsarSchema(Schema.STRING)` without enabling schema evolution,
 the schema stored in Pulsar topics is `Schema.BYTES`.
 {{< /hint >}}
 
@@ -883,7 +868,7 @@ Internally, a Pulsar partition is implemented as a topic. The Pulsar client prov
 implementation detail and handles routing under the hood automatically. Pulsar Sink uses a lower client
 API to implement its own routing layer to support multiple topics routing.
 
-For details, see  [partitioned topics](https://pulsar.apache.org/docs/en/cookbooks-partitioned/).
+For details, see [partitioned topics](https://pulsar.apache.org/docs/2.10.x/cookbooks-partitioned/).
 {{< /hint >}}
 
 ### Delivery Guarantee
@@ -895,12 +880,12 @@ For details, see  [partitioned topics](https://pulsar.apache.org/docs/en/cookboo
   It means that this mode has the highest throughput.
 - `AT_LEAST_ONCE`: No data loss happens, but data duplication can happen after a restart from checkpoint.
 - `EXACTLY_ONCE`: No data loss happens. Each record is sent to the Pulsar broker only once.
-  Pulsar Sink uses [Pulsar transaction](https://pulsar.apache.org/docs/en/transactions/)
+  Pulsar Sink uses [Pulsar transaction](https://pulsar.apache.org/docs/2.10.x/transactions/)
   and two-phase commit (2PC) to ensure records are sent only once even after pipeline restarts.
 
 ### Delayed message delivery
 
-[Delayed message delivery](https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery)
+[Delayed message delivery](https://pulsar.apache.org/docs/2.10.x/concepts-messaging/#delayed-message-delivery)
 enables you to delay the possibility to consume a message. With delayed message enabled, the Pulsar sink sends a message to the Pulsar topic
 **immediately**, but the message is delivered to a consumer once the specified delay is over.
 
@@ -939,118 +924,32 @@ sending behavior. You can just leave them alone if you do not have any performan
 
 {{< generated/pulsar_sink_configuration >}}
 
-### Sink Metrics
+### Brief Design Rationale
 
-This table lists supported metrics.
-The first 6 metrics are standard Pulsar Sink metrics as described in
-[FLIP-33: Standardize Connector Metrics]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics))
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 15%">Scope</th>
-      <th class="text-left" style="width: 18%">Metrics</th>
-      <th class="text-left" style="width: 18%">User Variables</th>
-      <th class="text-left" style="width: 39%">Description</th>
-      <th class="text-left" style="width: 10%">Type</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <th rowspan="13">Operator</th>
-        <td>numBytesOut</td>
-        <td>n/a</td>
-        <td>The total number of output bytes since the sink starts. Count towards the numBytesOut in TaskIOMetricsGroup.</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>numBytesOutPerSecond</td>
-        <td>n/a</td>
-        <td>The output bytes per second</td>
-        <td>Meter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOut</td>
-        <td>n/a</td>
-        <td>The total number of output records since the sink starts.</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOutPerSecond</td>
-        <td>n/a</td>
-        <td>The output records per second</td>
-        <td>Meter</td>
-    </tr>
-    <tr>
-        <td>numRecordsOutErrors</td>
-        <td>n/a</td>
-        <td>The total number of records failed to send</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>currentSendTime</td>
-        <td>n/a</td>
-        <td>The time it takes to send the last record, from enqueue the message in client buffer to its ack.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.numAcksReceived</td>
-        <td>n/a</td>
-        <td>The number of acks received for sent messages.</td>
-        <td>Counter</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.sendLatencyMax</td>
-        <td>n/a</td>
-        <td>The maximum send latency in the last refresh interval across all producers.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency50Pct</td>
-        <td>ProducerName</td>
-        <td>The 50th percentile of send latency in the last refresh interval for a specific producer.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency75Pct</td>
-        <td>ProducerName</td>
-        <td>The 75th percentile of send latency in the last refresh interval for a specific producer.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency95Pct</td>
-        <td>ProducerName</td>
-        <td>The 95th percentile of send latency in the last refresh interval for a specific producer.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency99Pct</td>
-        <td>ProducerName</td>
-        <td>The 99th percentile of send latency in the last refresh interval for a specific producer.</td>
-        <td>Gauge</td>
-    </tr>
-    <tr>
-        <td>PulsarSink.producer."ProducerName".sendLatency999Pct</td>
-        <td>ProducerName</td>
-        <td>The 99.9th percentile of send latency in the last refresh interval for a specific producer.</td>
-        <td>Gauge</td>
-    </tr>
-  </tbody>
-</table>
+Pulsar sink follow the Sink API defined in
+[FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).
 
-{{< hint info >}}
-- `numBytesOut`, `numRecordsOut`, `numRecordsOutErrors` are retrieved from Pulsar client metrics.
+#### Stateless SinkWriter
 
-- `currentSendTime` tracks the time from when the producer calls `sendAync()` to
-  the time when the message is acknowledged by the broker. This metric is not available in `NONE` delivery guarantee.
-{{< /hint >}}
+In `EXACTLY_ONCE` mode, the Pulsar sink does not store transaction information in a checkpoint.
+That means that new transactions will be created after a restart.
+Therefore, any message in previous pending transactions is either aborted or timed out
+(They are never visible to the downstream Pulsar consumer).
+The Pulsar team is working to optimize the needed resources by unfinished pending transactions.
+
+#### Pulsar Schema Evolution
+
+[Pulsar Schema Evolution][schema-evolution] allows
+you to reuse the same Flink job after certain "allowed" data model changes, like adding or deleting
+a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules
+and define an auto schema update. For details, refer to [Pulsar Schema Evolution][schema-evolution].
 
-The Pulsar producer refreshes its stats every 60 seconds by default. The PulsarSink retrieves the Pulsar producer
-stats every 500ms. That means that `numRecordsOut`, `numBytesOut`, `numAcksReceived`, and `numRecordsOutErrors` 
-are updated every 60 seconds. To increase the metrics refresh frequency, you can change
-the Pulsar producer stats refresh interval to a smaller value (minimum 1 second), as shown below.
+## Monitor the Metrics
 
-{{< tabs "b65b9978-b1d6-4b0d-ade8-78098e0f23d1" >}}
+The Pulsar client refreshes its stats every 60 seconds by default. To increase the metrics refresh frequency,
+you can change the Pulsar client stats refresh interval to a smaller value (minimum 1 second), as shown below.
+
+{{< tabs "pulsar-stats-interval-seconds" >}}
 
 {{< tab "Java" >}}
 ```java
@@ -1066,28 +965,219 @@ builder.set_config("pulsar.client.statsIntervalSeconds", "1")
 
 {{< /tabs >}}
 
-`numBytesOutRate` and `numRecordsOutRate` are calculated based on the `numBytesOut` and `numRecordsOUt`
-counter respectively. Flink internally uses a fixed 60 seconds window to calculate the rates.
+### Source Metrics
 
-### Brief Design Rationale
+Flink defines common source metrics in [FLIP-33: Standardize Connector Metrics][standard-metrics]. Pulsar connector will
+expose some client metrics if you enable the `pulsar.source.enableMetrics` option. All the custom source metrics are
+listed in below table.
 
-Pulsar sink follow the Sink API defined in 
-[FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).
+{{< tabs "pulsar-enable-source-metrics" >}}
 
-#### Stateless SinkWriter
+{{< tab "Java" >}}
+```java
+builder.setConfig(PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS, true);
+```
+{{< /tab >}}
 
-In `EXACTLY_ONCE` mode, the Pulsar sink does not store transaction information in a checkpoint.
-That means that new transactions will be created after a restart.
-Therefore, any message in previous pending transactions is either aborted or timed out
-(They are never visible to the downstream Pulsar consumer).
-The Pulsar team is working to optimize the needed resources by unfinished pending transactions.
+{{< tab "Python" >}}
+```python
+builder.set_config("pulsar.source.enableMetrics", "true")
+```
+{{< /tab >}}
 
-#### Pulsar Schema Evolution
+{{< /tabs >}}
 
-[Pulsar Schema Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/) allows
-you to reuse the same Flink job after certain "allowed" data model changes, like adding or deleting
-a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules
-and define an auto schema update. For details, refer to [Pulsar Schema Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/).
+| Metrics                                                        | User Variables      | Description                                                        | Type  |
+|----------------------------------------------------------------|---------------------|--------------------------------------------------------------------|-------|
+| PulsarConsumer."Topic"."ConsumerName".numMsgsReceived          | Topic, ConsumerName | Number of messages received in the last interval.                  | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numBytesReceived         | Topic, ConsumerName | Number of bytes received in the last interval.                     | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".rateMsgsReceived         | Topic, ConsumerName | Rate of bytes per second received in the last interval.            | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".rateBytesReceived        | Topic, ConsumerName | Rate of bytes per second received in the last interval.            | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numAcksSent              | Topic, ConsumerName | Number of message acknowledgments sent in the last interval.       | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numAcksFailed            | Topic, ConsumerName | Number of message acknowledgments failed in the last interval.     | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numReceiveFailed         | Topic, ConsumerName | Number of message receive failed in the last interval.             | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".numBatchReceiveFailed    | Topic, ConsumerName | Number of message batch receive failed in the last interval.       | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalMsgsReceived        | Topic, ConsumerName | Total number of messages received by this consumer.                | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalBytesReceived       | Topic, ConsumerName | Total number of bytes received by this consumer.                   | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalReceivedFailed      | Topic, ConsumerName | Total number of messages receive failures by this consumer.        | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalBatchReceivedFailed | Topic, ConsumerName | Total number of messages batch receive failures by this consumer.  | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalAcksSent            | Topic, ConsumerName | Total number of message acknowledgments sent by this consumer.     | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".totalAcksFailed          | Topic, ConsumerName | Total number of message acknowledgments failures on this consumer. | Gauge |
+| PulsarConsumer."Topic"."ConsumerName".msgNumInReceiverQueue    | Topic, ConsumerName | The size of receiver queue on this consumer.                       | Gauge |
+
+### Sink Metrics
+
+The below table lists supported sink metrics. The first 6 metrics are standard Pulsar Sink metrics as described in
+[FLIP-33: Standardize Connector Metrics][standard-metrics].
+
+The first 5 metrics are exposed to the flink metric system by default.
+You should enable the `pulsar.sink.enableMetrics` option to get the remaining metrics exposed.
+
+{{< tabs "pulsar-enable-sink-metrics" >}}
+
+{{< tab "Java" >}}
+```java
+builder.setConfig(PulsarSinkOptions.PULSAR_ENABLE_SINK_METRICS, true);
+```
+{{< /tab >}}
+
+{{< tab "Python" >}}
+```python
+builder.set_config("pulsar.sink.enableMetrics", "true")
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+| Metrics                                                       | User Variables      | Description                                                                                                  | Type    |
+|---------------------------------------------------------------|---------------------|--------------------------------------------------------------------------------------------------------------|---------|
+| numBytesOut                                                   | n/a                 | The total number of output bytes since the sink starts. Count towards the numBytesOut in TaskIOMetricsGroup. | Counter |
+| numBytesOutPerSecond                                          | n/a                 | The output bytes per second.                                                                                 | Meter   |
+| numRecordsOut                                                 | n/a                 | The total number of output records since the sink starts.                                                    | Counter |
+| numRecordsOutPerSecond                                        | n/a                 | The output records per second.                                                                               | Meter   |
+| numRecordsOutErrors                                           | n/a                 | The total number of records failed to send.                                                                  | Counter |
+| currentSendTime                                               | n/a                 | The time it takes to send the last record, from enqueue the message in client buffer to its ack.             | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numMsgsSent             | Topic, ProducerName | The number of messages published in the last interval.                                                       | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numBytesSent            | Topic, ProducerName | The number of bytes sent in the last interval.                                                               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numSendFailed           | Topic, ProducerName | The number of failed send operations in the last interval.                                                   | Gauge   |
+| PulsarProducer."Topic"."ProducerName".numAcksReceived         | Topic, ProducerName | The number of send acknowledges received by broker in the last interval.                                     | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendMsgsRate            | Topic, ProducerName | The messages send rate in the last interval.                                                                 | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendBytesRate           | Topic, ProducerName | The bytes send rate in the last interval.                                                                    | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis50pct  | Topic, ProducerName | The 50% of send latency in milliseconds for the last interval.                                               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis75pct  | Topic, ProducerName | The 75% of send latency in milliseconds for the last interval.                                               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis95pct  | Topic, ProducerName | The 95% of send latency in milliseconds for the last interval.                                               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis99pct  | Topic, ProducerName | The 99% of send latency in milliseconds for the last interval.                                               | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillis999pct | Topic, ProducerName | The 99.9% of send latency in milliseconds for the last interval.                                             | Gauge   |
+| PulsarProducer."Topic"."ProducerName".sendLatencyMillisMax    | Topic, ProducerName | The maximum send latency in milliseconds for the last interval.                                              | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalMsgsSent           | Topic, ProducerName | The total number of messages published by this producer.                                                     | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalBytesSent          | Topic, ProducerName | The total number of bytes sent by this producer.                                                             | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalSendFailed         | Topic, ProducerName | The total number of failed send operations.                                                                  | Gauge   |
+| PulsarProducer."Topic"."ProducerName".totalAcksReceived       | Topic, ProducerName | The total number of send acknowledges received by broker.                                                    | Gauge   |
+| PulsarProducer."Topic"."ProducerName".pendingQueueSize        | Topic, ProducerName | The current pending send-message queue size of the producer.                                                 | Gauge   |
+
+{{< hint info >}}
+- `numBytesOut`, `numRecordsOut` and `numRecordsOutErrors` are retrieved from Pulsar client metrics.
+
+- `numBytesOutPerSecond` and `numRecordsOutPerSecond` are calculated based on the `numBytesOut` and `numRecordsOUt`
+  counter respectively. Flink internally uses a fixed 60-seconds window to calculate the rates.
+
+- `currentSendTime` tracks the time from when the producer calls `sendAync()` to
+  the time when the broker acknowledges the message. This metric is not available in `NONE` delivery guarantee.
+{{< /hint >}}
+
+## End-to-end encryption
+
+Flink can use Pulsar's encryption to encrypt messages on the sink side and decrypt messages on the source side.
+Users should provide the public and private key pair to perform the encryption.
+Only with a valid key pair can decrypt the encrypted messages.
+
+### How to enable end-to-end encryption
+
+1. Generate a set of key pairs.
+
+   Pulsar supports multiple ECDSA or RSA key pairs in the meantime, you can provide
+   multiple key pairs. We will randomly choose a key pair to encrypt the message which makes the encryption more secure.
+   ```shell
+   # ECDSA (for Java clients only)
+   openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
+   openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
+
+   # RSA
+   openssl genrsa -out test_rsa_privkey.pem 2048
+   openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
+   ```
+
+2. Implement the `CryptoKeyReader` interface.
+
+   Each key pair should have a unique key name. Implement the `CryptoKeyReader` interface and make sure
+   `CryptoKeyReader.getPublicKey()` and `CryptoKeyReader.getPrivateKey()` can return the corresponding key by the
+   key name.
+
+   Pulsar provided a default `CryptoKeyReader` implementation named `DefaultCryptoKeyReader`. You can create it by using
+   the `DefaultCryptoKeyReader.builder()`. And make sure the key pair files should be placed on the Flink running environment.
+
+   ```java
+   // defaultPublicKey and defaultPrivateKey should be provided in this implementation.
+   // The file:///path/to/default-public.key should be a valid path on Flink's running environment.
+   CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
+       .defaultPublicKey("file:///path/to/default-public.key")
+       .defaultPrivateKey("file:///path/to/default-private.key")
+       .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
+       .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
+       .build();
+   ```
+
+3. (Optional) Implement the `MessageCrypto<MessageMetadata, MessageMetadata>` interface.
+
+   Pulsar supports the **ECDSA**, **RSA** out of box. You don't need to implement this interface if you use the common
+   existing encryption methods. If you want to define a custom key pair based crypto method, just implement the
+   `MessageCrypto<MessageMetadata, MessageMetadata>` interface. You can read the Pulsar's default implementation, the
+   `MessageCryptoBc`, for how to implement this crypto interface.
+
+4. Create `PulsarCrypto` instance.
+
+   `PulsarCrypto` is used for providing all the required information for encryption and decryption. You can use the builder
+   method to create the instance.
+
+   ```java
+   CryptoKeyReader keyReader = DefaultCryptoKeyReader.builder()
+       .defaultPublicKey("file:///path/to/public1.key")
+       .defaultPrivateKey("file:///path/to/private2.key")
+       .publicKey("key1", "file:///path/to/public1.key").privateKey("key1", "file:///path/to/private1.key")
+       .publicKey("key2", "file:///path/to/public2.key").privateKey("key2", "file:///path/to/private2.key")
+       .build();
+
+   // This line is only used as an example. It returns the default implementation of the MessageCrypto.
+   SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>> cryptoSupplier = () -> new MessageCryptoBc();
+
+   PulsarCrypto pulsarCrypto = PulsarCrypto.builder()
+       .cryptoKeyReader(keyReader)
+       // All the key name should be provided here, you can't encrypt the message with any non-existed key names.
+       .addEncryptKeys("key1", "key2")
+       // You don't have to provide the MessageCrypto.
+       .messageCrypto(cryptoSupplier)
+       .build()
+   ```
+
+### Decrypt the message on the Pulsar source
+
+Follow the previous instruction to create a `PulsarCrypto` instance and pass it to the `PulsarSource.builder()`.
+You need to choose the decrypt failure action in the meantime. Pulsar has three types of failure action which defines in
+`ConsumerCryptoFailureAction`.
+
+- `ConsumerCryptoFailureAction.FAIL`: The Flink pipeline will crash and turn into a failed state.
+- `ConsumerCryptoFailureAction.DISCARD`: Message is silently drop and not delivered to the downstream.
+- `ConsumerCryptoFailureAction.CONSUME`
+
+  The message will not be decrypted and directly passed to downstream. You can decrypt the message in
+  `PulsarDeserializationSchema`, the encryption information can be retrieved from `Message.getEncryptionCtx()`.
+
+```java
+PulsarCrypto pulsarCrypto = ...
+
+PulsarSource<String> sink = PulsarSource.builder()
+    ...
+    .setPulsarCrypto(pulsarCrypto, ConsumerCryptoFailureAction.FAIL)
+    .build();
+```
+
+### Encrypt the message on the Pulsar sink
+
+Follow the previous instruction to create a `PulsarCrypto` instance and pass it to the `PulsarSink.builder()`.
+You need to choose the encrypt failure action in the meantime. Pulsar has two types of failure action which defines in
+`ProducerCryptoFailureAction`.
+
+- `ProducerCryptoFailureAction.FAIL`: The Flink pipeline will crash and turn into a failed state.
+- `ProducerCryptoFailureAction.SEND`: Send the unencrypted messages.
+
+```java
+PulsarCrypto pulsarCrypto = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setPulsarCrypto(pulsarCrypto, ProducerCryptoFailureAction.FAIL)
+    .build();
+```
 
 ## Upgrading to the Latest Connector Version
 
@@ -1095,14 +1185,14 @@ The generic upgrade steps are outlined in [upgrading jobs and Flink versions gui
 The Pulsar connector does not store any state on the Flink side. The Pulsar connector pushes and stores all the states on the Pulsar side.
 For Pulsar, you additionally need to know these limitations:
 
-* Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
-* Always use a newer Pulsar client with Pulsar connector to consume messages from Pulsar.
+- Do not upgrade the Pulsar connector and Pulsar broker version at the same time.
+- Always use a newer Pulsar client with Pulsar connector to consume messages from Pulsar.
 
 ## Troubleshooting
 
 If you have a problem with Pulsar when using Flink, keep in mind that Flink only wraps
-[PulsarClient](https://pulsar.apache.org/docs/en/client-libraries-java/) or
-[PulsarAdmin](https://pulsar.apache.org/docs/en/admin-api-overview/)
+[PulsarClient](https://pulsar.apache.org/api/client/2.10.x/) or
+[PulsarAdmin](https://pulsar.apache.org/api/admin/2.10.x/)
 and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers,
 reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
 
@@ -1124,3 +1214,6 @@ If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `
 You can use the latest `pulsar-client-all` release to resolve this issue.
 
 {{< top >}}
+
+[schema-evolution]: https://pulsar.apache.org/docs/2.10.x/schema-evolution-compatibility/#schema-evolution
+[standard-metrics]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
index 7eb3cae..b779293 100644
--- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -18,7 +18,7 @@
             <td><h5>pulsar.consumer.ackTimeoutMillis</h5></td>
             <td style="word-wrap: break-word;">0</td>
             <td>Long</td>
-            <td>The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.<br />By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.<br />When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consum [...]
+            <td>The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.<br />By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.<br />When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer.</td>
         </tr>
         <tr>
             <td><h5>pulsar.consumer.acknowledgementsGroupTimeMicros</h5></td>
@@ -96,7 +96,7 @@
             <td><h5>pulsar.consumer.priorityLevel</h5></td>
             <td style="word-wrap: break-word;">0</td>
             <td>Integer</td>
-            <td>Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.<br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br />In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits. Otherwise, the broker considers consumers on the next priority level.<br /><br />Example 1<br />If a subscription has con [...]
+            <td>Priority level for a consumer to which a broker gives more priorities while dispatching messages in the subscription.<br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br /><br />Example 1<br />If a subscription has consumer A with <code class="highlighter-rouge">priorityLevel</code> 0 and consumer B with <code class="highlighter-rouge">priorityLevel</code> 1, then the broker only dispatches messages to consumer A until it runs out permi [...]
 C1, 0, 2
 C2, 0, 1
 C3, 0, 1
@@ -114,7 +114,7 @@ C5, 1, 1
             <td><h5>pulsar.consumer.readCompacted</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If enabling <code class="highlighter-rouge">readCompacted</code>, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.<br />A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br />Only enabling <code class="highlighter-rouge">readCompacted</code> on subscriptions to persistent topic [...]
+            <td>If enabling <code class="highlighter-rouge">readCompacted</code>, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.<br />A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br />Only enabling <code class="highlighter-rouge">readCompacted</code> on subscriptions to persistent topic [...]
         </tr>
         <tr>
             <td><h5>pulsar.consumer.receiverQueueSize</h5></td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html
index 7984903..5a86eb5 100644
--- a/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html
@@ -56,6 +56,12 @@
             <td>Long</td>
             <td>The sequence id for avoiding the duplication, it's used when Pulsar doesn't have transaction.</td>
         </tr>
+        <tr>
+            <td><h5>pulsar.producer.producerCryptoFailureAction</h5></td>
+            <td style="word-wrap: break-word;">FAIL</td>
+            <td><p>Enum</p></td>
+            <td>The action the producer will take in case of encryption failures.<br /><br />Possible values:<ul><li>"FAIL"</li><li>"SEND"</li></ul></td>
+        </tr>
         <tr>
             <td><h5>pulsar.producer.producerName</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
index 925c1be..7682394 100644
--- a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>pulsar.source.enableAutoAcknowledgeMessage</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to <code class="highlighter-rouge">true</code>.<br />The source would use pulsar client's internal mechanism and commit cursor in two ways.<ul><li>For <code class="highlighter-rouge">Key_Shared</code> and <code class="highlighter-rouge">Shared</code> subscriptio [...]
+            <td>Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to <code class="highlighter-rouge">true</code>.<br />The source would use pulsar client's internal mechanism and commit cursor in a given interval.</td>
         </tr>
         <tr>
             <td><h5>pulsar.source.enableSchemaEvolution</h5></td>
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index b945258..cdf8a23 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.cases.PulsarSinkTestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Tag;
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {
 
+    // Defines the Semantic.
     @TestSemantics
     CheckpointingMode[] semantics =
             new CheckpointingMode[] {
@@ -50,6 +51,7 @@ public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {
     @TestExternalSystem
     PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
+    // Defines a set of external context Factories for different test cases.
     @TestContext
     PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
             new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index d116d7e..ab018f8 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -23,7 +23,8 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment;
-import org.apache.flink.test.resources.ResourceTestUtils;
+
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.resourcePath;
 
 /** A Flink Container which would bundles pulsar connector in its classpath. */
 public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment {
@@ -37,10 +38,6 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
                 resourcePath("flink-connector-testing.jar"));
     }
 
-    private static String resourcePath(String jarName) {
-        return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
-    }
-
     private static Configuration flinkConfiguration() {
         Configuration configuration = new Configuration();
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/DefaultPulsarCrypto.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/DefaultPulsarCrypto.java
new file mode 100644
index 0000000..97c84d2
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/DefaultPulsarCrypto.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.crypto;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The default implementation for {@link PulsarCrypto}. */
+@PublicEvolving
+public class DefaultPulsarCrypto implements PulsarCrypto {
+
+    private final CryptoKeyReader cryptoKeyReader;
+    private final Set<String> encryptKeys;
+    private final SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>>
+            messageCryptoSupplier;
+
+    DefaultPulsarCrypto(
+            CryptoKeyReader cryptoKeyReader,
+            Set<String> encryptKeys,
+            SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>>
+                    messageCryptoSupplier) {
+        this.cryptoKeyReader = cryptoKeyReader;
+        this.encryptKeys = encryptKeys;
+        this.messageCryptoSupplier = messageCryptoSupplier;
+    }
+
+    @Override
+    public CryptoKeyReader cryptoKeyReader() {
+        return cryptoKeyReader;
+    }
+
+    @Override
+    public Set<String> encryptKeys() {
+        return encryptKeys;
+    }
+
+    @Nullable
+    @Override
+    public MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto() {
+        return messageCryptoSupplier.get();
+    }
+
+    /** The builder for building the {@link DefaultPulsarCrypto}. */
+    @PublicEvolving
+    public static class DefaultPulsarCryptoBuilder {
+
+        private CryptoKeyReader cryptoKeyReader;
+        private final Set<String> encryptKeys = new HashSet<>();
+        private SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>>
+                messageCryptoSupplier = () -> null;
+
+        DefaultPulsarCryptoBuilder() {}
+
+        public DefaultPulsarCryptoBuilder cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
+            this.cryptoKeyReader = cryptoKeyReader;
+            return this;
+        }
+
+        public DefaultPulsarCryptoBuilder addEncryptKeys(String... keys) {
+            encryptKeys.addAll(Arrays.asList(keys));
+            return this;
+        }
+
+        public DefaultPulsarCryptoBuilder messageCrypto(
+                SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>>
+                        messageCryptoSupplier) {
+            this.messageCryptoSupplier = messageCryptoSupplier;
+            return this;
+        }
+
+        public DefaultPulsarCrypto build() {
+            checkNotNull(cryptoKeyReader, "The CryptoKeyReader is required.");
+            checkArgument(!encryptKeys.isEmpty(), "The encrypt keys is required.");
+
+            return new DefaultPulsarCrypto(cryptoKeyReader, encryptKeys, messageCryptoSupplier);
+        }
+    }
+}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCrypto.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCrypto.java
new file mode 100644
index 0000000..df9b115
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCrypto.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.crypto;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.pulsar.common.crypto.DefaultPulsarCrypto.DefaultPulsarCryptoBuilder;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Use it for end-to-end encryption support in Flink. You should provide at least a {@link
+ * CryptoKeyReader} and the encryption keys.
+ */
+@PublicEvolving
+public interface PulsarCrypto extends Serializable {
+
+    /**
+     * Crypto is a key store for encrypting. The key pair is stored with a key name. Pulsar will
+     * randomly choose a key name from the {@link #encryptKeys()} and get the related public key or
+     * private key for encrypting or decrypting the message.
+     *
+     * @return A non-null instance will enable the end-to-end encryption.
+     */
+    @Nullable
+    CryptoKeyReader cryptoKeyReader();
+
+    /**
+     * Return a set of key names. These key names can be used to acquire the key pairs in {@link
+     * CryptoKeyReader}.
+     *
+     * <p>At the time of producer creation, the Pulsar client checks if there are keys added to
+     * encryptionKeys. If keys are found, a callback {@link CryptoKeyReader#getPrivateKey(String,
+     * Map)} and {@link CryptoKeyReader#getPublicKey(String, Map)} is invoked against each key to
+     * load the values of the key. Application should implement this callback to return the key in
+     * pkcs8 format. If compression is enabled, the message is encrypted after compression. If batch
+     * messaging is enabled, the batched message is encrypted.
+     *
+     * @return It shouldn't be a null or empty instance if your have returned a key reader.
+     */
+    Set<String> encryptKeys();
+
+    /**
+     * {@link MessageCrypto} is used to define how to decrypt/encrypt the message. It's not required
+     * by default, because Pulsar will provide a default implementation by using the bouncy castle.
+     */
+    @Nullable
+    default MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto() {
+        return null;
+    }
+
+    /** Disable the end-to-end encryption. */
+    @Internal
+    static PulsarCrypto disabled() {
+        return new PulsarCryptoDisabled();
+    }
+
+    /** Return the builder for building a {@link PulsarCrypto} instance. */
+    static DefaultPulsarCryptoBuilder builder() {
+        return new DefaultPulsarCryptoBuilder();
+    }
+}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCryptoDisabled.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCryptoDisabled.java
new file mode 100644
index 0000000..7f6902f
--- /dev/null
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/crypto/PulsarCryptoDisabled.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.crypto;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** A default implementation, we will use it if the user didn't enable end-to-end encryption. */
+@Internal
+public class PulsarCryptoDisabled implements PulsarCrypto {
+
+    @Override
+    public CryptoKeyReader cryptoKeyReader() {
+        return null;
+    }
+
+    @Override
+    public Set<String> encryptKeys() {
+        return Collections.emptySet();
+    }
+}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
index 4c6c4a9..b4f7d6e 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter;
@@ -37,6 +38,8 @@ import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializat
 import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -83,23 +86,24 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
     private final SinkConfiguration sinkConfiguration;
     private final PulsarSerializationSchema<IN> serializationSchema;
     private final TopicMetadataListener metadataListener;
-    private final MessageDelayer<IN> messageDelayer;
     private final TopicRouter<IN> topicRouter;
+    private final MessageDelayer<IN> messageDelayer;
+    private final PulsarCrypto pulsarCrypto;
 
     PulsarSink(
             SinkConfiguration sinkConfiguration,
             PulsarSerializationSchema<IN> serializationSchema,
             TopicMetadataListener metadataListener,
             TopicRoutingMode topicRoutingMode,
-            TopicRouter<IN> topicRouter,
-            MessageDelayer<IN> messageDelayer) {
+            @Nullable TopicRouter<IN> topicRouter,
+            MessageDelayer<IN> messageDelayer,
+            PulsarCrypto pulsarCrypto) {
         this.sinkConfiguration = checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.metadataListener = checkNotNull(metadataListener);
-        this.messageDelayer = checkNotNull(messageDelayer);
-        checkNotNull(topicRoutingMode);
 
         // Create topic router supplier.
+        checkNotNull(topicRoutingMode);
         if (topicRoutingMode == TopicRoutingMode.CUSTOM) {
             this.topicRouter = checkNotNull(topicRouter);
         } else if (topicRoutingMode == TopicRoutingMode.ROUND_ROBIN) {
@@ -107,6 +111,9 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
         } else {
             this.topicRouter = new KeyHashTopicRouter<>(sinkConfiguration);
         }
+
+        this.messageDelayer = checkNotNull(messageDelayer);
+        this.pulsarCrypto = checkNotNull(pulsarCrypto);
     }
 
     /**
@@ -128,6 +135,7 @@ public class PulsarSink<IN> implements TwoPhaseCommittingSink<IN, PulsarCommitta
                 metadataListener,
                 topicRouter,
                 messageDelayer,
+                pulsarCrypto,
                 initContext);
     }
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
index 1bdb053..f394a09 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer;
 import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter;
@@ -34,6 +35,7 @@ import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializat
 import org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchemaWrapper;
 import org.apache.flink.connector.pulsar.sink.writer.topic.TopicMetadataListener;
 
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.slf4j.Logger;
@@ -50,6 +52,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_DELIVERY_GUARANTEE;
@@ -57,6 +60,7 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_TRANSACTION_TIMEOUT;
 import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.SINK_CONFIG_VALIDATOR;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.distinctTopics;
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -110,6 +114,7 @@ public class PulsarSinkBuilder<IN> {
     private TopicRoutingMode topicRoutingMode;
     private TopicRouter<IN> topicRouter;
     private MessageDelayer<IN> messageDelayer;
+    private PulsarCrypto pulsarCrypto;
 
     // private builder constructor.
     PulsarSinkBuilder() {
@@ -290,7 +295,8 @@ public class PulsarSinkBuilder<IN> {
     /**
      * Set a message delayer for enable Pulsar message delay delivery.
      *
-     * @param messageDelayer The delayer which would defined when to send the message to consumer.
+     * @param messageDelayer The delayer which would defined when to send the message to the
+     *     consumer.
      * @return this PulsarSinkBuilder.
      */
     public PulsarSinkBuilder<IN> delaySendingMessage(MessageDelayer<IN> messageDelayer) {
@@ -298,6 +304,20 @@ public class PulsarSinkBuilder<IN> {
         return this;
     }
 
+    /**
+     * Sets a {@link PulsarCrypto}. Configure the key reader and keys to be used to encrypt the
+     * message payloads.
+     *
+     * @param pulsarCrypto PulsarCrypto object.
+     * @return this PulsarSinkBuilder.
+     */
+    public PulsarSinkBuilder<IN> setPulsarCrypto(
+            PulsarCrypto pulsarCrypto, ProducerCryptoFailureAction action) {
+        this.pulsarCrypto = checkNotNull(pulsarCrypto);
+        configBuilder.set(PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION, action);
+        return this;
+    }
+
     /**
      * Configure the authentication provider to use in the Pulsar client instance.
      *
@@ -431,7 +451,7 @@ public class PulsarSinkBuilder<IN> {
             }
         }
 
-        // Topic routing mode validate.
+        // Topic routing mode validation.
         if (topicRoutingMode == null) {
             LOG.info("No topic routing mode has been chosen. We use round-robin mode as default.");
             this.topicRoutingMode = TopicRoutingMode.ROUND_ROBIN;
@@ -441,6 +461,17 @@ public class PulsarSinkBuilder<IN> {
             this.messageDelayer = MessageDelayer.never();
         }
 
+        if (pulsarCrypto == null) {
+            this.pulsarCrypto = PulsarCrypto.disabled();
+        }
+
+        // Make sure they are serializable.
+        checkState(
+                isSerializable(serializationSchema),
+                "PulsarSerializationSchema isn't serializable");
+        checkState(isSerializable(messageDelayer), "MessageDelayer isn't serializable");
+        checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");
+
         // This is an unmodifiable configuration for Pulsar.
         // We don't use Pulsar's built-in configure classes for compatible requirement.
         SinkConfiguration sinkConfiguration =
@@ -452,12 +483,13 @@ public class PulsarSinkBuilder<IN> {
                 metadataListener,
                 topicRoutingMode,
                 topicRouter,
-                messageDelayer);
+                messageDelayer,
+                pulsarCrypto);
     }
 
     // ------------- private helpers  --------------
 
-    /** Helper method for java compiler recognize the generic type. */
+    /** Helper method for java compiler recognizes the generic type. */
     @SuppressWarnings("unchecked")
     private <T extends IN> PulsarSinkBuilder<T> specialized() {
         return (PulsarSinkBuilder<T>) this;
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index e1218d4..dbc1026 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -29,6 +29,7 @@ import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.sink.writer.router.MessageKeyHash;
 
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
 
 import java.time.Duration;
 import java.util.Map;
@@ -37,7 +38,6 @@ import static java.util.Collections.emptyMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
-import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PRODUCER_CONFIG_PREFIX;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.SINK_CONFIG_PREFIX;
@@ -130,18 +130,6 @@ public final class PulsarSinkOptions {
                             "The allowed transaction recommit times if we meet some retryable exception."
                                     + " This is used in Pulsar Transaction.");
 
-    /** @deprecated This config option was removed for better performance. */
-    @Deprecated
-    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
-            ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
-                    .intType()
-                    .defaultValue(1000)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The maximum number of pending messages in one sink parallelism.")
-                                    .build());
-
     public static final ConfigOption<Boolean> PULSAR_ENABLE_SINK_METRICS =
             ConfigOptions.key(SINK_CONFIG_PREFIX + "enableMetrics")
                     .booleanType()
@@ -182,51 +170,6 @@ public final class PulsarSinkOptions {
                                             code("sendTimeout"))
                                     .build());
 
-    /** @deprecated Use {@link PulsarOptions#PULSAR_MEMORY_LIMIT_BYTES} since Pulsar 2.10.0 */
-    @Deprecated
-    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES =
-            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessages")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text("The maximum size of a queue holding pending messages.")
-                                    .linebreak()
-                                    .text(
-                                            "For example, a message waiting to receive an acknowledgment from a %s.",
-                                            link(
-                                                    "broker",
-                                                    "https://pulsar.apache.org/docs/en/reference-terminology#broker"))
-                                    .linebreak()
-                                    .text(
-                                            "By default, when the queue is full, all calls to the %s and %s methods fail unless you set %s to true.",
-                                            code("Send"),
-                                            code("SendAsync"),
-                                            code("BlockIfQueueFull"))
-                                    .text(
-                                            "Since Pulsar 2.10.0, you shouldn't set this option, use %s instead.",
-                                            code(PULSAR_MEMORY_LIMIT_BYTES.key()))
-                                    .build());
-
-    /** @deprecated Use {@link PulsarOptions#PULSAR_MEMORY_LIMIT_BYTES} since Pulsar 2.10.0 */
-    @Deprecated
-    public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS =
-            ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "maxPendingMessagesAcrossPartitions")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "The maximum number of pending messages across partitions.")
-                                    .linebreak()
-                                    .text(
-                                            "Use the setting to lower the max pending messages for each partition (%s) if the total number exceeds the configured value.",
-                                            code("setMaxPendingMessages"))
-                                    .text(
-                                            "Since Pulsar 2.10.0, you shouldn't set this option, use %s instead.",
-                                            code(PULSAR_MEMORY_LIMIT_BYTES.key()))
-                                    .build());
-
     public static final ConfigOption<Long> PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS =
             ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "batchingMaxPublishDelayMicros")
                     .longType()
@@ -303,4 +246,12 @@ public final class PulsarSinkOptions {
                                     .text(
                                             " When getting a topic stats, associate this metadata with the consumer stats for easier identification.")
                                     .build());
+
+    public static final ConfigOption<ProducerCryptoFailureAction>
+            PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION =
+                    ConfigOptions.key(PRODUCER_CONFIG_PREFIX + "producerCryptoFailureAction")
+                            .enumType(ProducerCryptoFailureAction.class)
+                            .defaultValue(ProducerCryptoFailureAction.FAIL)
+                            .withDescription(
+                                    "The action the producer will take in case of encryption failures.");
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 61cfd5a..7823e27 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -34,7 +34,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
-import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_ENABLED;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_BYTES;
@@ -44,20 +43,19 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BA
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_CHUNKING_ENABLED;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_COMPRESSION_TYPE;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_INITIAL_SEQUENCE_ID;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_PROPERTIES;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_SEND_TIMEOUT_MS;
 import static org.apache.pulsar.client.api.MessageRoutingMode.SinglePartition;
 import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
 
-/** Create the {@link Producer} to send message and a validator for building sink config. */
+/** Create the {@link Producer} to send messages and a validator for building sink config. */
 @Internal
 public final class PulsarSinkConfigUtils {
 
     private PulsarSinkConfigUtils() {
-        // No need to create instance.
+        // No need to create the instance.
     }
 
     public static final PulsarConfigValidator SINK_CONFIG_VALIDATOR =
@@ -65,10 +63,6 @@ public final class PulsarSinkConfigUtils {
                     .requiredOption(PULSAR_SERVICE_URL)
                     .requiredOption(PULSAR_ADMIN_URL)
                     .conflictOptions(PULSAR_AUTH_PARAMS, PULSAR_AUTH_PARAM_MAP)
-                    .conflictOptions(PULSAR_MEMORY_LIMIT_BYTES, PULSAR_MAX_PENDING_MESSAGES)
-                    .conflictOptions(
-                            PULSAR_MEMORY_LIMIT_BYTES,
-                            PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)
                     .build();
 
     /** Create a pulsar producer builder by using the given Configuration. */
@@ -96,6 +90,8 @@ public final class PulsarSinkConfigUtils {
         configuration.useOption(PULSAR_CHUNKING_ENABLED, builder::enableChunking);
         configuration.useOption(PULSAR_COMPRESSION_TYPE, builder::compressionType);
         configuration.useOption(PULSAR_INITIAL_SEQUENCE_ID, builder::initialSequenceId);
+        configuration.useOption(
+                PULSAR_PRODUCER_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
 
         // Set producer properties
         Map<String, String> properties = configuration.getProperties(PULSAR_PRODUCER_PROPERTIES);
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index c5c4622..0d3009c 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema.Initializat
 import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext;
@@ -84,7 +85,8 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
      * @param sinkConfiguration The configuration to configure the Pulsar producer.
      * @param serializationSchema Transform the incoming records into different message properties.
      * @param metadataListener The listener for querying topic metadata.
-     * @param topicRouter Topic router to choose topic by incoming records.
+     * @param topicRouter Topic router to choose the topic by incoming records.
+     * @param pulsarCrypto Used for end-to-end encryption.
      * @param initContext Context to provide information about the runtime environment.
      */
     public PulsarWriter(
@@ -93,6 +95,7 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
             TopicMetadataListener metadataListener,
             TopicRouter<IN> topicRouter,
             MessageDelayer<IN> messageDelayer,
+            PulsarCrypto pulsarCrypto,
             InitContext initContext) {
         checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
@@ -123,7 +126,8 @@ public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommi
 
         // Create this producer register after opening serialization schema!
         this.producerRegister =
-                new TopicProducerRegister(sinkConfiguration, initContext.metricGroup());
+                new TopicProducerRegister(
+                        sinkConfiguration, pulsarCrypto, initContext.metricGroup());
         this.mailboxExecutor = initContext.getMailboxExecutor();
         this.pendingMessages = new AtomicLong(0);
     }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
index cf8ff00..ae99527 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.pulsar.sink.writer.topic;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.common.metrics.ProducerMetricsInterceptor;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
@@ -29,6 +30,8 @@ import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerStats;
@@ -38,7 +41,10 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.shade.com.google.common.base.Strings;
 
@@ -49,6 +55,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
@@ -73,6 +80,7 @@ import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
 import static org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils.createProducerBuilder;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -84,14 +92,18 @@ public class TopicProducerRegister implements Closeable {
 
     private final PulsarClient pulsarClient;
     private final SinkConfiguration sinkConfiguration;
+    private final PulsarCrypto pulsarCrypto;
     private final SinkWriterMetricGroup metricGroup;
     private final Map<String, Map<SchemaInfo, Producer<?>>> producerRegister;
     private final Map<String, Transaction> transactionRegister;
 
     public TopicProducerRegister(
-            SinkConfiguration sinkConfiguration, SinkWriterMetricGroup metricGroup) {
+            SinkConfiguration sinkConfiguration,
+            PulsarCrypto pulsarCrypto,
+            SinkWriterMetricGroup metricGroup) {
         this.pulsarClient = createClient(sinkConfiguration);
         this.sinkConfiguration = sinkConfiguration;
+        this.pulsarCrypto = pulsarCrypto;
         this.metricGroup = metricGroup;
         this.producerRegister = new HashMap<>();
         this.transactionRegister = new HashMap<>();
@@ -176,22 +188,46 @@ public class TopicProducerRegister implements Closeable {
 
         if (producers.containsKey(schemaInfo)) {
             return (Producer<T>) producers.get(schemaInfo);
-        } else {
-            ProducerBuilder<T> builder =
-                    createProducerBuilder(pulsarClient, schema, sinkConfiguration);
-            // Set the required topic name.
-            builder.topic(topic);
-            // Set the sending counter for metrics.
-            builder.intercept(new ProducerMetricsInterceptor(metricGroup));
+        }
 
-            Producer<T> producer = sneakyClient(builder::create);
+        ProducerBuilder<T> builder = createProducerBuilder(pulsarClient, schema, sinkConfiguration);
+
+        // Set the message crypto key reader.
+        CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
+        if (cryptoKeyReader != null) {
+            builder.cryptoKeyReader(cryptoKeyReader);
+
+            // Set the encrypt keys.
+            Set<String> encryptKeys = pulsarCrypto.encryptKeys();
+            checkArgument(
+                    encryptKeys != null && !encryptKeys.isEmpty(),
+                    "You should provide encryptKeys in PulsarCrypto");
+            encryptKeys.forEach(builder::addEncryptionKey);
+
+            // Set the message crypto if provided.
+            // Pulsar forgets to expose the config in producer builder.
+            // See issue https://github.com/apache/pulsar/issues/19139
+            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
+                    pulsarCrypto.messageCrypto();
+            if (messageCrypto != null) {
+                ProducerConfigurationData producerConfig =
+                        ((ProducerBuilderImpl<?>) builder).getConf();
+                producerConfig.setMessageCrypto(messageCrypto);
+            }
+        }
 
-            // Expose the stats for calculating and monitoring.
-            exposeProducerMetrics(producer);
-            producers.put(schemaInfo, producer);
+        // Set the required topic name.
+        builder.topic(topic);
+        // Set the sending counter for metrics.
+        builder.intercept(new ProducerMetricsInterceptor(metricGroup));
 
-            return producer;
-        }
+        Producer<T> producer = sneakyClient(builder::create);
+
+        // Expose the stats for calculating and monitoring.
+        exposeProducerMetrics(producer);
+        producers.put(schemaInfo, producer);
+
+        return producer;
     }
 
     /**
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index 66cc202..6b79808 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
@@ -86,9 +87,11 @@ public final class PulsarSource<OUT>
 
     private final Boundedness boundedness;
 
-    /** The pulsar deserialization schema used for deserializing message. */
+    /** The pulsar deserialization schema is used for deserializing message. */
     private final PulsarDeserializationSchema<OUT> deserializationSchema;
 
+    private final PulsarCrypto pulsarCrypto;
+
     /**
      * The constructor for PulsarSource, it's package protected for forcing using {@link
      * PulsarSourceBuilder}.
@@ -100,7 +103,8 @@ public final class PulsarSource<OUT>
             StartCursor startCursor,
             StopCursor stopCursor,
             Boundedness boundedness,
-            PulsarDeserializationSchema<OUT> deserializationSchema) {
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            PulsarCrypto pulsarCrypto) {
         this.sourceConfiguration = sourceConfiguration;
         this.subscriber = subscriber;
         this.rangeGenerator = rangeGenerator;
@@ -108,6 +112,7 @@ public final class PulsarSource<OUT>
         this.stopCursor = stopCursor;
         this.boundedness = boundedness;
         this.deserializationSchema = deserializationSchema;
+        this.pulsarCrypto = pulsarCrypto;
     }
 
     /**
@@ -133,7 +138,8 @@ public final class PulsarSource<OUT>
                 new PulsarDeserializationSchemaInitializationContext(readerContext);
         deserializationSchema.open(initializationContext, sourceConfiguration);
 
-        return PulsarSourceReader.create(sourceConfiguration, deserializationSchema, readerContext);
+        return PulsarSourceReader.create(
+                sourceConfiguration, deserializationSchema, pulsarCrypto, readerContext);
     }
 
     @Internal
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 5fb9d0b..1290304 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -40,6 +41,7 @@ import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeseri
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarTypeInformationWrapper;
 
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -59,6 +61,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -130,6 +133,7 @@ public final class PulsarSourceBuilder<OUT> {
     private StopCursor stopCursor;
     private Boundedness boundedness;
     private PulsarDeserializationSchema<OUT> deserializationSchema;
+    private PulsarCrypto pulsarCrypto;
 
     // private builder constructor.
     PulsarSourceBuilder() {
@@ -426,6 +430,20 @@ public final class PulsarSourceBuilder<OUT> {
         return self;
     }
 
+    /**
+     * Sets a {@link PulsarCrypto}. Configure the key reader and keys to be used to encrypt the
+     * message payloads.
+     *
+     * @param pulsarCrypto PulsarCrypto object.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<OUT> setPulsarCrypto(
+            PulsarCrypto pulsarCrypto, ConsumerCryptoFailureAction action) {
+        this.pulsarCrypto = checkNotNull(pulsarCrypto);
+        configBuilder.set(PULSAR_CRYPTO_FAILURE_ACTION, action);
+        return this;
+    }
+
     /**
      * Configure the authentication provider to use in the Pulsar client instance.
      *
@@ -542,6 +560,10 @@ public final class PulsarSourceBuilder<OUT> {
                             + " We would use bypass Schema check by default.");
         }
 
+        if (pulsarCrypto == null) {
+            this.pulsarCrypto = PulsarCrypto.disabled();
+        }
+
         if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
             LOG.warn(
                     "We recommend set a readable consumer name through setConsumerName(String) in production mode.");
@@ -552,10 +574,14 @@ public final class PulsarSourceBuilder<OUT> {
             }
         }
 
-        // Since these implementations could be a lambda, make sure they are serializable.
+        // Make sure they are serializable.
+        checkState(
+                isSerializable(deserializationSchema),
+                "PulsarDeserializationSchema isn't serializable");
         checkState(isSerializable(startCursor), "StartCursor isn't serializable");
         checkState(isSerializable(stopCursor), "StopCursor isn't serializable");
         checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable");
+        checkState(isSerializable(pulsarCrypto), "PulsarCrypto isn't serializable");
 
         // Check builder configuration.
         SourceConfiguration sourceConfiguration =
@@ -568,7 +594,8 @@ public final class PulsarSourceBuilder<OUT> {
                 startCursor,
                 stopCursor,
                 boundedness,
-                deserializationSchema);
+                deserializationSchema,
+                pulsarCrypto);
     }
 
     // ------------- private helpers  --------------
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 4099a52..a74f80f 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -26,12 +26,9 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.source.config.CursorVerification;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.time.Duration;
 import java.util.Map;
@@ -102,14 +99,7 @@ public final class PulsarSourceOptions {
                                             code("true"))
                                     .linebreak()
                                     .text(
-                                            "The source would use pulsar client's internal mechanism and commit cursor in two ways.")
-                                    .list(
-                                            text(
-                                                    "For %s and %s subscription, the cursor would be committed once the message is consumed.",
-                                                    code("Key_Shared"), code("Shared")),
-                                            text(
-                                                    "For %s and %s subscription, the cursor would be committed in a given interval.",
-                                                    code("Exclusive"), code("Failover")))
+                                            "The source would use pulsar client's internal mechanism and commit cursor in a given interval.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
@@ -124,25 +114,6 @@ public final class PulsarSourceOptions {
                                             " We would automatically commit the cursor using the given period (in ms).")
                                     .build());
 
-    /** @deprecated We no longer need transactions for consuming messages. */
-    @Deprecated
-    public static final ConfigOption<Long> PULSAR_READ_TRANSACTION_TIMEOUT =
-            ConfigOptions.key(SOURCE_CONFIG_PREFIX + "transactionTimeoutMillis")
-                    .longType()
-                    .defaultValue(Duration.ofHours(3).toMillis())
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "This option is used in %s or %s subscription.",
-                                            code("Shared"), code("Key_Shared"))
-                                    .text(
-                                            " You should configure this option when you do not enable the %s option.",
-                                            code("pulsar.source.enableAutoAcknowledgeMessage"))
-                                    .linebreak()
-                                    .text(
-                                            "The value (in ms) should be greater than the checkpoint interval.")
-                                    .build());
-
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
                     .longType()
@@ -244,25 +215,6 @@ public final class PulsarSourceOptions {
                                             " This argument is required when constructing the consumer.")
                                     .build());
 
-    /** @deprecated This config option is no longer supported. */
-    @Deprecated
-    public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE =
-            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType")
-                    .enumType(SubscriptionType.class)
-                    .defaultValue(SubscriptionType.Shared)
-                    .withDescription(
-                            Description.builder()
-                                    .text("Subscription type.")
-                                    .linebreak()
-                                    .linebreak()
-                                    .text("Four subscription types are available:")
-                                    .list(
-                                            text("Exclusive"),
-                                            text("Failover"),
-                                            text("Shared"),
-                                            text("Key_Shared"))
-                                    .build());
-
     public static final ConfigOption<SubscriptionMode> PULSAR_SUBSCRIPTION_MODE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionMode")
                     .enumType(SubscriptionMode.class)
@@ -364,7 +316,7 @@ public final class PulsarSourceOptions {
                                             "By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.")
                                     .linebreak()
                                     .text(
-                                            "When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).")
+                                            "When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS =
@@ -387,16 +339,11 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.")
+                                            "Priority level for a consumer to which a broker gives more priorities while dispatching messages in the subscription.")
                                     .linebreak()
                                     .text(
                                             "The broker follows descending priorities. For example, 0=max-priority, 1, 2,...")
                                     .linebreak()
-                                    .text(
-                                            "In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits.")
-                                    .text(
-                                            " Otherwise, the broker considers consumers on the next priority level.")
-                                    .linebreak()
                                     .linebreak()
                                     .text("Example 1")
                                     .linebreak()
@@ -539,24 +486,10 @@ public final class PulsarSourceOptions {
                                             code("readCompacted"))
                                     .linebreak()
                                     .text(
-                                            "Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a %s.",
+                                            "Attempting to enable it on subscriptions to non-persistent topics leads to a subscription call throwing a %s.",
                                             code("PulsarClientException"))
                                     .build());
 
-    /**
-     * @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
-     *     Pulsar didn't support this config option before 1.10.1, so we have to remove this config
-     *     option.
-     */
-    @Deprecated
-    public static final ConfigOption<SubscriptionInitialPosition>
-            PULSAR_SUBSCRIPTION_INITIAL_POSITION =
-                    ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")
-                            .enumType(SubscriptionInitialPosition.class)
-                            .defaultValue(SubscriptionInitialPosition.Latest)
-                            .withDescription(
-                                    "Initial position at which to set cursor when subscribing to a topic at first time.");
-
     // The config set for DeadLetterPolicy
 
     /**
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index 664aa7a..638e4c7 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
@@ -43,12 +44,15 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +99,7 @@ public class PulsarPartitionSplitReader
     @VisibleForTesting final PulsarAdmin pulsarAdmin;
     @VisibleForTesting final SourceConfiguration sourceConfiguration;
     private final Schema<byte[]> schema;
+    private final PulsarCrypto pulsarCrypto;
     private final SourceReaderMetricGroup metricGroup;
 
     private Consumer<byte[]> pulsarConsumer;
@@ -105,11 +110,13 @@ public class PulsarPartitionSplitReader
             PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
             Schema<byte[]> schema,
+            PulsarCrypto pulsarCrypto,
             SourceReaderMetricGroup metricGroup) {
         this.pulsarClient = pulsarClient;
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
         this.schema = schema;
+        this.pulsarCrypto = pulsarCrypto;
         this.metricGroup = metricGroup;
     }
 
@@ -280,6 +287,19 @@ public class PulsarPartitionSplitReader
 
         consumerBuilder.topic(partition.getFullTopicName());
 
+        // Add CryptoKeyReader if it exists for supporting end-to-end encryption.
+        CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
+        if (cryptoKeyReader != null) {
+            consumerBuilder.cryptoKeyReader(cryptoKeyReader);
+
+            // Add MessageCrypto if provided.
+            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
+                    pulsarCrypto.messageCrypto();
+            if (messageCrypto != null) {
+                consumerBuilder.messageCrypto(messageCrypto);
+            }
+        }
+
         // Add KeySharedPolicy for partial keys subscription.
         if (!isFullTopicRanges(partition.getRanges())) {
             KeySharedPolicy policy = stickyHashRange().ranges(partition.getPulsarRanges());
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
index 6f8ff2c..5c47d5d 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
 import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
@@ -252,6 +253,7 @@ public class PulsarSourceReader<OUT>
     public static <OUT> PulsarSourceReader<OUT> create(
             SourceConfiguration sourceConfiguration,
             PulsarDeserializationSchema<OUT> deserializationSchema,
+            PulsarCrypto pulsarCrypto,
             SourceReaderContext readerContext) {
 
         // Create a message queue with the predefined source option.
@@ -281,6 +283,7 @@ public class PulsarSourceReader<OUT>
                                 pulsarAdmin,
                                 sourceConfiguration,
                                 schema,
+                                pulsarCrypto,
                                 readerContext.metricGroup());
 
         PulsarSourceFetcherManager fetcherManager =
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index fff75ae..6942034 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -25,7 +25,8 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.cases.PulsarEncryptSinkContext;
+import org.apache.flink.connector.pulsar.testutils.sink.cases.PulsarSinkTestContext;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
@@ -57,7 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 class PulsarSinkITCase {
 
-    /** Integration test based on connector testing framework. */
+    /** Integration test based on the connector testing framework. */
     @Nested
     class IntegrationTest extends SinkTestSuiteBase<String> {
 
@@ -75,6 +76,10 @@ class PulsarSinkITCase {
         @TestContext
         PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
                 new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarEncryptSinkContext> encryptMessages =
+                new PulsarTestContextFactory<>(pulsar, PulsarEncryptSinkContext::new);
     }
 
     /** Tests for using PulsarSink writing to a Pulsar cluster. */
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 41d3928..9b949eb 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.sink2.Sink.InitContext;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.sink.writer.delayer.FixedMessageDelayer;
@@ -78,7 +79,14 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
         MockInitContext initContext = new MockInitContext();
 
         PulsarWriter<String> writer =
-                new PulsarWriter<>(configuration, schema, listener, router, delayer, initContext);
+                new PulsarWriter<>(
+                        configuration,
+                        schema,
+                        listener,
+                        router,
+                        delayer,
+                        PulsarCrypto.disabled(),
+                        initContext);
 
         writer.flush(false);
         writer.prepareCommit();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
index f0eb67e..be4b964 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.sink.writer.topic;
 
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
 import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
@@ -51,7 +52,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase {
 
         SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
         TopicProducerRegister register =
-                new TopicProducerRegister(configuration, createSinkWriterMetricGroup());
+                new TopicProducerRegister(
+                        configuration, PulsarCrypto.disabled(), createSinkWriterMetricGroup());
 
         String message = randomAlphabetic(10);
         register.createMessageBuilder(topic, Schema.STRING).value(message).send();
@@ -79,7 +81,8 @@ class TopicProducerRegisterTest extends PulsarTestSuiteBase {
 
         SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
         TopicProducerRegister register =
-                new TopicProducerRegister(configuration, createSinkWriterMetricGroup());
+                new TopicProducerRegister(
+                        configuration, PulsarCrypto.disabled(), createSinkWriterMetricGroup());
 
         String message = randomAlphabetic(10);
         register.createMessageBuilder(topic, Schema.STRING).value(message).sendAsync();
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 8a90557..0d8041a 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.source;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import org.apache.flink.connector.pulsar.testutils.source.cases.ConsumeEncryptMessagesContext;
 import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.source.cases.SingleTopicConsumingContext;
@@ -66,4 +67,8 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestContext
     PulsarTestContextFactory<String, PartialKeysConsumingContext> partialKeys =
             new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new);
+
+    @TestContext
+    PulsarTestContextFactory<String, ConsumeEncryptMessagesContext> encryptMessages =
+            new PulsarTestContextFactory<>(pulsar, ConsumeEncryptMessagesContext::new);
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index 4ad80d0..d82aad3 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
@@ -59,6 +60,7 @@ class StopCursorTest extends PulsarTestSuiteBase {
                         operator().admin(),
                         sourceConfig(),
                         Schema.BYTES,
+                        PulsarCrypto.disabled(),
                         createSourceReaderMetricGroup());
         // send the first message and set the stopCursor to filter any late stopCursor
         operator()
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index 52698ea..6902a7e 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.source.reader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
 import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
@@ -272,6 +273,7 @@ class PulsarPartitionSplitReaderTest extends PulsarTestSuiteBase {
                 operator().admin(),
                 sourceConfig(),
                 new BytesSchema(new PulsarSchema<>(STRING)),
+                PulsarCrypto.disabled(),
                 createSourceReaderMetricGroup());
     }
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index cbad32b..e805d14 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
@@ -244,7 +245,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
 
         SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
 
-        return PulsarSourceReader.create(sourceConfiguration, deserializationSchema, context);
+        return PulsarSourceReader.create(
+                sourceConfiguration, deserializationSchema, PulsarCrypto.disabled(), context);
     }
 
     private void setupSourceReader(
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
index 9946013..466fc3b 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.test.resources.ResourceTestUtils;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.junit.jupiter.api.extension.ParameterContext;
@@ -46,6 +47,10 @@ public class PulsarTestCommonUtils {
         }
     }
 
+    public static String resourcePath(String jarName) {
+        return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
+    }
+
     /** creates a fullRange() partitionSplit. */
     public static PulsarPartitionSplit createPartitionSplit(String topic, int partitionId) {
         return createPartitionSplit(topic, partitionId, Boundedness.CONTINUOUS_UNBOUNDED);
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestKeyReader.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestKeyReader.java
new file mode 100644
index 0000000..d433de2
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestKeyReader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils;
+
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+import java.util.Map;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+
+/**
+ * A default key reader for the Pulsar client. We would load the pre-generated key file and validate
+ * it.
+ */
+public class PulsarTestKeyReader implements CryptoKeyReader {
+    private static final long serialVersionUID = -7488297938196049791L;
+
+    public static final String ENCRYPT_KEY = "flink";
+
+    private final byte[] publicKey =
+            ("-----BEGIN PUBLIC KEY-----\n"
+                            + "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwZAyz0PSoggZGIoxcrYJ\n"
+                            + "L/s9+6aAeuETFmnfT7ityhRgSBLK9MB6lyJKahmGJpEMTgYtZCb2QBlg7w8fOKb3\n"
+                            + "Z8UPSbR3DR38cMaftfP37VADFXTW8pxRj6p84NdgeoLDsyiVijIqxpCaPuW6ne4T\n"
+                            + "mN1PEV20zbGeWb6l52bWkp1gincc7ghhzzEB8F1Q7YBfRcA7pBRVhmW2yxXK8K/0\n"
+                            + "dpkVWK9XQf7/b2I9tLb0OXyaOoMsOCoAqoOy1EmgQ/iF0wO52WN4pcBTshifgrLS\n"
+                            + "dAJxGctuHvov0inYzrwxwEw+DWVbTAG8mBRiNkPnIwnewdDTZpMlxNrL+6p2aRDF\n"
+                            + "pQIDAQAB\n"
+                            + "-----END PUBLIC KEY-----")
+                    .getBytes();
+    private final byte[] privateKey =
+            ("-----BEGIN RSA PRIVATE KEY-----\n"
+                            + "MIIEpAIBAAKCAQEAwZAyz0PSoggZGIoxcrYJL/s9+6aAeuETFmnfT7ityhRgSBLK\n"
+                            + "9MB6lyJKahmGJpEMTgYtZCb2QBlg7w8fOKb3Z8UPSbR3DR38cMaftfP37VADFXTW\n"
+                            + "8pxRj6p84NdgeoLDsyiVijIqxpCaPuW6ne4TmN1PEV20zbGeWb6l52bWkp1gincc\n"
+                            + "7ghhzzEB8F1Q7YBfRcA7pBRVhmW2yxXK8K/0dpkVWK9XQf7/b2I9tLb0OXyaOoMs\n"
+                            + "OCoAqoOy1EmgQ/iF0wO52WN4pcBTshifgrLSdAJxGctuHvov0inYzrwxwEw+DWVb\n"
+                            + "TAG8mBRiNkPnIwnewdDTZpMlxNrL+6p2aRDFpQIDAQABAoIBAQCoOCWwM4VPBDKr\n"
+                            + "PQ6UXtfFN1g66A0ovYrVI9XLdviyctrqSErSQqVHy6lYZC5OPiivdnbkX2gLdQLl\n"
+                            + "QAMmPRUuvff3WjtMzw6hBD2w6AJD4BGOCCj2WBwZ+1TkIsnaLuLdRRwRKmA6iVlD\n"
+                            + "6Gsy7cFiTJN2yDVlvkOcQy/z11ALwe2KbiMg6f72K75fgRoJ1wB54d4+CnBNHOyW\n"
+                            + "B5Oi6ahTLp5SOZJuJuXBrvqARdODdYCHKCIEM96YtUhhQG4Ll+erWTwKlz7G848U\n"
+                            + "Ac/Yq62S3muuNttBxCdlOMXjh8zcKVMTzt/Cnvc2azYy3Z93R9jppE6BIKb98ohh\n"
+                            + "mQfE7E3RAoGBAPSs6wlka78taH/J93wNospuMXiOGqbQCYbwxpQ3228rLzx3ffyZ\n"
+                            + "0jw2r1hpVUsmCcaavq8vXJBmq6lSPa74FgAviTYiGPpkio5PWB5aZM4KGld2TyZJ\n"
+                            + "480rrEgBYZDQfWFAA68Kx/HXOVfUIccR624/HEXnu8llrLWGmMNARWBvAoGBAMqF\n"
+                            + "qxUtiZQM6BzXx2U/uUowqwGUnWegUAaPaLfaOpuRCDcxToff9QLBg5xSaOOOIZk5\n"
+                            + "3Fvl4ROnVsz64OLzAcXBGoKxJTwnojpbkw4HMKE7MqD2TWc9D7pc2T5VA650/DDC\n"
+                            + "FQNkKg1k/bgH+HCgMnwndRrbM7w9sljjaqca0x0rAoGAQEvof9FZ1yVRnrMuS/Ux\n"
+                            + "YEzQx0NgkZF9z24aYPzEt1P718H245hwfM5KCcu0VEksrHohvduOUYwJdDdeakpb\n"
+                            + "TbUwM3+GXNZq6rbDC0bp0pMpFO7MId2s9U+SuGFUiD+hkxrFXQxSOqU6NnBSaAO3\n"
+                            + "gIMpJN2epXAIkLNMFZMgKBUCgYARIpgkFZNDZIgrEJK9XVPnFBET9CgRQX4j3/Rj\n"
+                            + "QeKdkPrZ+KEFXAyV7BufmVVok3kCRuP/HocZq5nrg/qNGTR4L+t3TVeyLERMnbzm\n"
+                            + "ffM+YQzak5xe9Mqk4QA8huLl2t4Pngw7Gjl4oqfY70u088jxukDtQcixz6KMZMl8\n"
+                            + "VAeyuwKBgQCcbFC00vqMvB56tc96Wdz2vn0tcjyKE+QL8f6KTG/JtVAHSGhfFkmh\n"
+                            + "+V+g3/5sr9urOm5I//+riAqbUuuP3IIeVY4UYh5YjgwJcrGRIx0PufExsDaYCl98\n"
+                            + "GWeubd7Bk7phXjj36+Sbs3a0dD+HmDdbHAF3JXVZlT7sVUJo4UEzlg==\n"
+                            + "-----END RSA PRIVATE KEY-----")
+                    .getBytes();
+
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata) {
+        if (ENCRYPT_KEY.equals(keyName)) {
+            return new EncryptionKeyInfo(copyKey(publicKey), metadata);
+        }
+
+        throw new IllegalArgumentException("We only support encrypt key: \"" + ENCRYPT_KEY + "\"");
+    }
+
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) {
+        if (ENCRYPT_KEY.equals(keyName)) {
+            return new EncryptionKeyInfo(copyKey(privateKey), metadata);
+        }
+
+        throw new IllegalArgumentException("We only support encrypt key: \"" + ENCRYPT_KEY + "\"");
+    }
+
+    private byte[] copyKey(byte[] key) {
+        // The byte array is not immutable. Duplicate it for safety.
+        byte[] k = new byte[key.length];
+        System.arraycopy(key, 0, k, 0, key.length);
+
+        return k;
+    }
+
+    /** The implementation for default {@link MessageCryptoBc}. */
+    public static class MessageCryptoBcSupplier
+            implements SerializableSupplier<MessageCrypto<MessageMetadata, MessageMetadata>> {
+
+        private final boolean producer;
+
+        public MessageCryptoBcSupplier(boolean producer) {
+            this.producer = producer;
+        }
+
+        @Override
+        public MessageCrypto<MessageMetadata, MessageMetadata> get() {
+            return new MessageCryptoBc(randomAlphabetic(10), producer);
+        }
+    }
+}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarEncryptSinkContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarEncryptSinkContext.java
new file mode 100644
index 0000000..42857a1
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarEncryptSinkContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink.cases;
+
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader.MessageCryptoBcSupplier;
+import org.apache.flink.connector.pulsar.testutils.sink.reader.PulsarEncryptDataReader;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader.ENCRYPT_KEY;
+import static org.apache.pulsar.client.api.ProducerCryptoFailureAction.FAIL;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/** The sink context for supporting producing messages which are encrypted. */
+public class PulsarEncryptSinkContext extends PulsarSinkTestContext {
+
+    public PulsarEncryptSinkContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected void setSinkBuilder(PulsarSinkBuilder<String> builder) {
+        super.setSinkBuilder(builder);
+
+        PulsarCrypto pulsarCrypto =
+                PulsarCrypto.builder()
+                        .cryptoKeyReader(new PulsarTestKeyReader())
+                        .addEncryptKeys(ENCRYPT_KEY)
+                        .messageCrypto(new MessageCryptoBcSupplier(true))
+                        .build();
+        builder.setPulsarCrypto(pulsarCrypto, FAIL);
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        PulsarCrypto pulsarCrypto =
+                PulsarCrypto.builder()
+                        .cryptoKeyReader(new PulsarTestKeyReader())
+                        .addEncryptKeys(ENCRYPT_KEY)
+                        .messageCrypto(new MessageCryptoBcSupplier(false))
+                        .build();
+        PulsarEncryptDataReader<String> reader =
+                new PulsarEncryptDataReader<>(operator, topicName, STRING, pulsarCrypto);
+        closer.register(reader);
+
+        return reader;
+    }
+
+    @Override
+    protected String displayName() {
+        return "write messages into one topic by End-to-end encryption";
+    }
+}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarSinkTestContext.java
similarity index 79%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarSinkTestContext.java
index a697f30..e563b56 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/cases/PulsarSinkTestContext.java
@@ -16,15 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.sink;
+package org.apache.flink.connector.pulsar.testutils.sink.cases;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.sink.PulsarSinkBuilder;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.sink.reader.PulsarPartitionDataReader;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
 import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
 import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
@@ -38,7 +40,6 @@ import java.util.List;
 import java.util.Random;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
 
@@ -51,8 +52,8 @@ public class PulsarSinkTestContext extends PulsarTestContext<String>
     private static final int RECORD_SIZE_LOWER_BOUND = 100;
     private static final int RECORD_STRING_SIZE = 20;
 
-    private String topicName = topicName();
-    private final Closer closer = Closer.create();
+    protected String topicName = topicName();
+    protected final Closer closer = Closer.create();
 
     public PulsarSinkTestContext(PulsarTestEnvironment environment) {
         super(environment, Schema.STRING);
@@ -68,22 +69,24 @@ public class PulsarSinkTestContext extends PulsarTestContext<String>
         operator.createTopic(topicName, 4);
         DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
 
-        return PulsarSink.builder()
-                .setServiceUrl(operator.serviceUrl())
-                .setAdminUrl(operator.adminUrl())
-                .setTopics(topicName)
-                .setDeliveryGuarantee(guarantee)
-                .setSerializationSchema(schema)
-                .enableSchemaEvolution()
-                .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
-                .build();
+        PulsarSinkBuilder<String> builder =
+                PulsarSink.builder()
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
+                        .setTopics(topicName)
+                        .setDeliveryGuarantee(guarantee)
+                        .setSerializationSchema(schema)
+                        .enableSchemaEvolution()
+                        .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4);
+        setSinkBuilder(builder);
+
+        return builder.build();
     }
 
     @Override
     public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
         PulsarPartitionDataReader<String> reader =
-                sneakyClient(
-                        () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+                new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING);
         closer.register(reader);
 
         return reader;
@@ -117,6 +120,10 @@ public class PulsarSinkTestContext extends PulsarTestContext<String>
         closer.close();
     }
 
+    protected void setSinkBuilder(PulsarSinkBuilder<String> builder) {
+        // Nothing to do by default.
+    }
+
     private String topicName() {
         return TOPIC_NAME_PREFIX + randomAlphanumeric(8);
     }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarEncryptDataReader.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarEncryptDataReader.java
new file mode 100644
index 0000000..14ece03
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarEncryptDataReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink.reader;
+
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+
+import org.apache.pulsar.client.api.Schema;
+
+/** The data reader for reading encrypted messages from Pulsar. */
+public class PulsarEncryptDataReader<T> extends PulsarPartitionDataReader<T> {
+
+    public PulsarEncryptDataReader(
+            PulsarRuntimeOperator operator,
+            String fullTopicName,
+            Schema<T> schema,
+            PulsarCrypto pulsarCrypto) {
+        super(operator, fullTopicName, schema, pulsarCrypto);
+    }
+}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarPartitionDataReader.java
similarity index 68%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarPartitionDataReader.java
index 6ef9510..17a6523 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/reader/PulsarPartitionDataReader.java
@@ -16,18 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.sink;
+package org.apache.flink.connector.pulsar.testutils.sink.reader;
 
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
 
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +44,7 @@ import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 
 /** The data reader for a specified topic partition from Pulsar. */
 public class PulsarPartitionDataReader<T> implements ExternalSystemDataReader<T>, Closeable {
@@ -47,19 +54,40 @@ public class PulsarPartitionDataReader<T> implements ExternalSystemDataReader<T>
     private final Consumer<T> consumer;
 
     public PulsarPartitionDataReader(
-            PulsarRuntimeOperator operator, String fullTopicName, Schema<T> schema)
-            throws PulsarClientException {
-        // Create client for supporting the use in E2E test.
+            PulsarRuntimeOperator operator, String fullTopicName, Schema<T> schema) {
+        this(operator, fullTopicName, schema, PulsarCrypto.disabled());
+    }
+
+    protected PulsarPartitionDataReader(
+            PulsarRuntimeOperator operator,
+            String fullTopicName,
+            Schema<T> schema,
+            PulsarCrypto pulsarCrypto) {
+        // Create the consumer for supporting the E2E tests in the meantime.
         String subscriptionName = randomAlphanumeric(12);
-        this.consumer =
+        ConsumerBuilder<T> builder =
                 operator.client()
                         .newConsumer(schema)
                         .topic(fullTopicName)
                         .subscriptionName(subscriptionName)
                         .subscriptionType(SubscriptionType.Exclusive)
                         .subscriptionMode(SubscriptionMode.Durable)
-                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                        .subscribe();
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
+
+        CryptoKeyReader cryptoKeyReader = pulsarCrypto.cryptoKeyReader();
+        if (cryptoKeyReader != null) {
+            // Add the crypto.
+            builder.cryptoKeyReader(cryptoKeyReader);
+            builder.cryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
+
+            MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
+                    pulsarCrypto.messageCrypto();
+            if (messageCrypto != null) {
+                builder.messageCrypto(messageCrypto);
+            }
+        }
+
+        this.consumer = sneakyClient(builder::subscribe);
     }
 
     @Override
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index e33839c..a3aa9e9 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -27,6 +27,7 @@ import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.writer.PulsarPartitionDataWriter;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
 import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/ConsumeEncryptMessagesContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/ConsumeEncryptMessagesContext.java
new file mode 100644
index 0000000..367102a
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/ConsumeEncryptMessagesContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.source.cases;
+
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader.MessageCryptoBcSupplier;
+import org.apache.flink.connector.pulsar.testutils.source.writer.PulsarEncryptDataWriter;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestKeyReader.ENCRYPT_KEY;
+import static org.apache.pulsar.client.api.ConsumerCryptoFailureAction.FAIL;
+
+/** We will use this context for producing messages with encryption support. */
+public class ConsumeEncryptMessagesContext extends MultipleTopicConsumingContext {
+
+    public ConsumeEncryptMessagesContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
+        super.setSourceBuilder(builder);
+
+        // Set PulsarCrypto for the Pulsar source.
+        PulsarCrypto pulsarCrypto =
+                PulsarCrypto.builder()
+                        .cryptoKeyReader(new PulsarTestKeyReader())
+                        .addEncryptKeys(ENCRYPT_KEY)
+                        .messageCrypto(new MessageCryptoBcSupplier(false))
+                        .build();
+        builder.setPulsarCrypto(pulsarCrypto, FAIL);
+    }
+
+    @Override
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
+        String partitionName = generatePartitionName();
+        PulsarCrypto pulsarCrypto =
+                PulsarCrypto.builder()
+                        .cryptoKeyReader(new PulsarTestKeyReader())
+                        .addEncryptKeys(ENCRYPT_KEY)
+                        .messageCrypto(new MessageCryptoBcSupplier(true))
+                        .build();
+        return new PulsarEncryptDataWriter<>(operator, partitionName, schema, pulsarCrypto);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consume messages by end-to-end encryption";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-encryption-subscription";
+    }
+}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
index c5dc8b6..7ebb499 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
@@ -21,7 +21,7 @@ package org.apache.flink.connector.pulsar.testutils.source.cases;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedKeysRangeGenerator;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.testutils.source.writer.KeyedPulsarPartitionDataWriter;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java
similarity index 97%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java
index 23d65f0..194aa48 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/KeyedPulsarPartitionDataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.source;
+package org.apache.flink.connector.pulsar.testutils.source.writer;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarEncryptDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarEncryptDataWriter.java
new file mode 100644
index 0000000..9eeb8c9
--- /dev/null
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarEncryptDataWriter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.source.writer;
+
+import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.pulsar.client.api.ProducerAccessMode.Shared;
+
+/** Encrypt the messages with the given public key and send the message to Pulsar. */
+public class PulsarEncryptDataWriter<T> implements ExternalSystemSplitDataWriter<T> {
+
+    private final Producer<T> producer;
+
+    public PulsarEncryptDataWriter(
+            PulsarRuntimeOperator operator,
+            String fullTopicName,
+            Schema<T> schema,
+            PulsarCrypto pulsarCrypto) {
+        ProducerBuilder<T> builder =
+                operator.client()
+                        .newProducer(schema)
+                        .topic(fullTopicName)
+                        .enableBatching(false)
+                        .enableMultiSchema(true)
+                        .accessMode(Shared)
+                        .cryptoFailureAction(ProducerCryptoFailureAction.FAIL)
+                        .cryptoKeyReader(pulsarCrypto.cryptoKeyReader());
+
+        Set<String> encryptKeys = pulsarCrypto.encryptKeys();
+        encryptKeys.forEach(builder::addEncryptionKey);
+
+        MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto =
+                pulsarCrypto.messageCrypto();
+        if (messageCrypto != null) {
+            ProducerConfigurationData conf = ((ProducerBuilderImpl<?>) builder).getConf();
+            conf.setMessageCrypto(messageCrypto);
+        }
+
+        this.producer = sneakyClient(builder::create);
+    }
+
+    @Override
+    public void writeRecords(List<T> records) {
+        for (T record : records) {
+            sneakyClient(() -> producer.newMessage().value(record).send());
+        }
+        sneakyClient(producer::flush);
+    }
+
+    @Override
+    public void close() throws Exception {
+        producer.close();
+    }
+}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java
similarity index 96%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java
index 1ceb292..c5893c9 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/writer/PulsarPartitionDataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.source;
+package org.apache.flink.connector.pulsar.testutils.source.writer;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
diff --git a/pom.xml b/pom.xml
index a6cbc9f..ac517a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,6 @@ under the License.
 
         <jsr305.version>1.3.9</jsr305.version>
         <junit5.version>5.9.1</junit5.version>
-        <junit5-platform.version>1.9.1</junit5-platform.version>
         <assertj.version>3.23.1</assertj.version>
         <mockito.version>4.11.0</mockito.version>
         <archunit.version>1.0.1</archunit.version>
@@ -316,6 +315,14 @@ under the License.
                 <version>${log4j.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.junit</groupId>
+                <artifactId>junit-bom</artifactId>
+                <version>${junit5.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+
             <dependency>
                 <groupId>org.testcontainers</groupId>
                 <artifactId>testcontainers-bom</artifactId>
@@ -358,22 +365,6 @@ under the License.
 
             <!-- For dependency convergence -->
 
-            <dependency>
-                <groupId>org.junit.platform</groupId>
-                <artifactId>junit-platform-engine</artifactId>
-                <version>${junit5-platform.version}</version>
-            </dependency>
-
-            <!-- For dependency convergence -->
-
-            <dependency>
-                <groupId>org.junit.jupiter</groupId>
-                <artifactId>junit-jupiter</artifactId>
-                <version>${junit5.version}</version>
-            </dependency>
-
-            <!-- For dependency convergence -->
-
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-compress</artifactId>