You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/08/14 14:49:39 UTC

[flink] branch release-1.15 updated: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972) (#20564)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 5f842cb8665 [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972) (#20564)
5f842cb8665 is described below

commit 5f842cb8665e831d6acb978a57b896accd1c0928
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Sun Aug 14 22:49:32 2022 +0800

    [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972) (#20564)
    
    * Change the initial start cursor and stop cursor to better handle the consuming behaviors.
    * Create the initial subscription instead seek every time. This should fix the wrong position setting.
    * Fix the wrong stop cursor, make sure it stops at the correct space
    * Drop Consumer.seek() for https://github.com/apache/pulsar/pull/16171
---
 .../docs/connectors/datastream/pulsar.md           |  53 +++--
 docs/content/docs/connectors/datastream/pulsar.md  |  70 +++---
 .../generated/pulsar_consumer_configuration.html   |   6 -
 .../connector/pulsar/source/PulsarSource.java      |  14 +-
 .../pulsar/source/PulsarSourceBuilder.java         |   8 +-
 .../pulsar/source/PulsarSourceOptions.java         |   7 +
 .../source/config/PulsarSourceConfigUtils.java     |   3 -
 .../pulsar/source/config/SourceConfiguration.java  |   4 +
 .../source/enumerator/PulsarSourceEnumState.java   |  16 +-
 .../source/enumerator/PulsarSourceEnumerator.java  | 148 ++++++-------
 .../source/enumerator/SplitsAssignmentState.java   | 239 ---------------------
 .../assigner/NonSharedSplitAssigner.java           | 126 +++++++++++
 .../enumerator/assigner/SharedSplitAssigner.java   | 148 +++++++++++++
 .../source/enumerator/assigner/SplitAssigner.java  |  64 ++++++
 .../enumerator/assigner/SplitAssignerFactory.java  |  65 ++++++
 .../source/enumerator/cursor/CursorPosition.java   |  36 ++--
 .../source/enumerator/cursor/MessageIdUtils.java   |  71 ++++++
 .../source/enumerator/cursor/StartCursor.java      |  32 ++-
 .../source/enumerator/cursor/StopCursor.java       |  96 +++++++--
 .../cursor/start/MessageIdStartCursor.java         |  24 +--
 .../cursor/start/TimestampStartCursor.java         |   4 +-
 .../cursor/stop/EventTimestampStopCursor.java      |   9 +-
 .../cursor/stop/LatestMessageStopCursor.java       |  21 +-
 .../cursor/stop/MessageIdStopCursor.java           |  32 +--
 .../enumerator/cursor/stop/NeverStopCursor.java    |   4 +-
 .../cursor/stop/PublishTimestampStopCursor.java    |   9 +-
 .../enumerator/subscriber/PulsarSubscriber.java    |   2 +-
 .../source/reader/PulsarSourceReaderFactory.java   |   2 +-
 .../split/PulsarOrderedPartitionSplitReader.java   |  42 +++-
 .../split/PulsarPartitionSplitReaderBase.java      |  55 ++---
 .../split/PulsarUnorderedPartitionSplitReader.java |   2 +-
 .../pulsar/source/split/PulsarPartitionSplit.java  |   4 +-
 .../pulsar/source/PulsarSourceITCase.java          |  24 ---
 .../PulsarSourceEnumStateSerializerTest.java       |   5 +-
 .../enumerator/PulsarSourceEnumeratorTest.java     |  10 +-
 .../enumerator/SplitsAssignmentStateTest.java      | 119 ----------
 .../assigner/NonSharedSplitAssignerTest.java       |  95 ++++++++
 .../assigner/SharedSplitAssignerTest.java          |  98 +++++++++
 .../enumerator/assigner/SplitAssignerTestBase.java | 113 ++++++++++
 .../source/enumerator/cursor/StopCursorTest.java   |  11 +-
 .../split/PulsarPartitionSplitReaderTestBase.java  |   8 +-
 .../pulsar/testutils/PulsarTestContext.java        |  10 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   |  80 +++----
 43 files changed, 1260 insertions(+), 729 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 8d44be88c71..4bc1766ef12 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -76,7 +76,7 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
   ```java
   PulsarSource.builder().setTopics("some-topic1", "some-topic2");
 
-  // 从 topic "topic-a" 的 0 和 1 分区上消费
+  // 从 topic "topic-a" 的 0 和 2 分区上消费
   PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2");
   ```
 
@@ -204,10 +204,14 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
-- 从给定的消息时间开始消费。
+- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`。
   ```java
   StartCursor.fromMessageTime(long);
   ```
+- 从给定的消息发布时间开始消费。
+  ```java
+  StartCursor.fromPublishTime(long);
+  ```
 
 {{< hint info >}}
 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
@@ -239,14 +243,22 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
-- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
+  ```java
+  StopCursor.atEventTime(long);
+  ```
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterEventTime(long);
+  ```
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
   ```java
   StopCursor.atPublishTime(long);
   ```
-
-{{< hint warning >}}
-StopCursor.atEventTime(long) 目前已经处于弃用状态。
-{{< /hint >}}
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterPublishTime(long);
+  ```
 
 ### Source 配置项
 
@@ -352,7 +364,7 @@ PulsarSink<String> sink = PulsarSink.builder()
     .setAdminUrl(adminUrl)
     .setTopics("topic1")
     .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
-    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
     .build();
 
 stream.sinkTo(sink);
@@ -375,10 +387,10 @@ stream.sinkTo(sink);
 // Topic "some-topic1" 和 "some-topic2"
 PulsarSink.builder().setTopics("some-topic1", "some-topic2")
 
-// Topic "topic-a" 的分区 0 和 2 
+// Topic "topic-a" 的分区 0 和 2
 PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
 
-// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2 
+// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2
 PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", "some-topic2")
 ```
 
@@ -619,7 +631,11 @@ Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 `ProducerConfigurat
 
 默认情况下,Pulsar 生产者每隔 60 秒才会刷新一次监控数据,然而 Pulsar Sink 每 500 毫秒就会从 Pulsar 生产者中获得最新的监控数据。因此 `numRecordsOut`、`numBytesOut`、`numAcksReceived` 以及 `numRecordsOutErrors` 4 个指标实际上每 60 秒才会刷新一次。
 
-如果想要更高地刷新评率,可以通过 `builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)` 来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s)。
+如果想要更高地刷新评率,可以通过如下方式来将 Pulsar 生产者的监控数据刷新频率调整至相应值(最低为1s):
+
+```java
+builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
+```
 
 `numBytesOutRate` 和 `numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut` 和 `numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。
 
@@ -650,7 +666,20 @@ Pulsar Sink 遵循 [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/
 
 用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。
 
+## 已知问题
+
+本节介绍有关 Pulsar 连接器的一些已知问题。
+
 ### 在 Java 11 上使用不稳定
 
-Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.
+Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector。
+
+### 不自动重连,而是抛出TransactionCoordinatorNotFound异常
+
+Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2
+引入了这个问题 [a break change](https://github.com/apache/pulsar/pull/13135)。
+如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。
+
+您可以使用最新的`pulsar-client-all`分支来解决这个问题。
+
 {{< top >}}
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 0953753ee33..b88fc7904b6 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -241,10 +241,16 @@ The Pulsar connector consumes from the latest available message if the message I
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
-- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+This method is deprecated because the name is totally wrong which may cause confuse.
+You can use `StartCursor.fromPublishTime(long)` instead.
   ```java
   StartCursor.fromMessageTime(long);
   ```
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+  ```java
+  StartCursor.fromPublishTime(long);
+  ```
 
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
@@ -281,14 +287,26 @@ Built-in stop cursors include:
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
-- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
+- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time won't be included in the consuming result.
+  ```java
+  StopCursor.atEventTime(long);
+  ```
+- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time will be included in the consuming result.
+  ```java
+  StopCursor.afterEventTime(long);
+  ```
+- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+given publish time won't be included in the consuming result.
   ```java
   StopCursor.atPublishTime(long);
   ```
-
-{{< hint warning >}}
-StopCursor.atEventTime(long) is now deprecated.
-  {{< /hint >}}
+- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+  given publish time will be included in the consuming result.
+  ```java
+  StopCursor.afterPublishTime(long);
+  ```
 
 ### Source Configurable Options
 
@@ -431,9 +449,9 @@ PulsarSink<String> sink = PulsarSink.builder()
     .setAdminUrl(adminUrl)
     .setTopics("topic1")
     .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))
-    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
     .build();
-        
+
 stream.sinkTo(sink);
 ```
 
@@ -742,7 +760,7 @@ are updated every 60 seconds. To increase the metrics refresh frequency, you can
 the Pulsar producer stats refresh interval to a smaller value (minimum 1 second), as shown below.
 
 ```java
-builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)
+builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, 1L);
 ```
 
 `numBytesOutRate` and `numRecordsOutRate` are calculated based on the `numBytesOut` and `numRecordsOUt`
@@ -768,23 +786,6 @@ you to reuse the same Flink job after certain "allowed" data model changes, like
 a field in a AVRO-based Pojo class. Please note that you can specify Pulsar schema validation rules
 and define an auto schema update. For details, refer to [Pulsar Schema Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/).
 
-## Known Issues
-
-This section describes some known issues about the Pulsar connectors.
-
-### Unstable on Java 11
-
-Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector
-on Java 8.
-
-### No TransactionCoordinatorNotFound, but automatic reconnect
-
-Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2
-introduces [a break change](https://github.com/apache/pulsar/pull/13135) in transactions.
-If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `TransactionCoordinatorNotFound` exception.
-
-You can use the latest `pulsar-client-all` release to resolve this issue.
-
 ## Upgrading to the Latest Connector Version
 
 The generic upgrade steps are outlined in [upgrading jobs and Flink versions guide]({{< ref "docs/ops/upgrading" >}}).
@@ -802,4 +803,21 @@ If you have a problem with Pulsar when using Flink, keep in mind that Flink only
 and your problem might be independent of Flink and sometimes can be solved by upgrading Pulsar brokers,
 reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
 
+## Known Issues
+
+This section describes some known issues about the Pulsar connectors.
+
+### Unstable on Java 11
+
+Pulsar connector has some known issues on Java 11. It is recommended to run Pulsar connector
+on Java 8.
+
+### No TransactionCoordinatorNotFound, but automatic reconnect
+
+Pulsar transactions are still in active development and are not stable. Pulsar 2.9.2
+introduces [a break change](https://github.com/apache/pulsar/pull/13135) in transactions.
+If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a `TransactionCoordinatorNotFound` exception.
+
+You can use the latest `pulsar-client-all` release to resolve this issue.
+
 {{< top >}}
diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
index bc8b6df4060..4e05b270de4 100644
--- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -140,12 +140,6 @@ C5, 1, 1
             <td>Boolean</td>
             <td>If enabled, the consumer will automatically retry messages.</td>
         </tr>
-        <tr>
-            <td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
-            <td style="word-wrap: break-word;">Latest</td>
-            <td><p>Enum</p></td>
-            <td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
-        </tr>
         <tr>
             <td><h5>pulsar.consumer.subscriptionMode</h5></td>
             <td style="word-wrap: break-word;">Durable</td>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index a6c48d14bc8..6f4775df366 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -32,7 +32,8 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -142,15 +143,14 @@ public final class PulsarSource<OUT>
     @Override
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(stopCursor, sourceConfiguration);
+        SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
                 rangeGenerator,
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                splitAssigner);
     }
 
     @Internal
@@ -158,15 +158,15 @@ public final class PulsarSource<OUT>
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             PulsarSourceEnumState checkpoint) {
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
+        SplitAssigner splitAssigner =
+                SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
                 rangeGenerator,
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                splitAssigner);
     }
 
     @Internal
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index ed68215819f..1a5d6ea9758 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -97,7 +97,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
  * discovery always expected new splits to come and not exiting. To disable auto partition
  * discovery, use builder.setConfig({@link
- * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
  *
  * <pre>{@code
  * PulsarSource<String> source = PulsarSource
@@ -266,7 +266,7 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * The consumer name is informative and it can be used to identify a particular consumer
+     * The consumer name is informative, and it can be used to identify a particular consumer
      * instance from the topic stats.
      */
     public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public final class PulsarSourceBuilder<OUT> {
      * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
      * discovery always expected new splits to come and not exiting. To disable auto partition
      * discovery, use builder.setConfig({@link
-     * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+     * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
      *
      * @param stopCursor The {@link StopCursor} to specify the stopping offset.
      * @return this PulsarSourceBuilder.
@@ -334,7 +334,7 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
      * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
      * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
      * to specify the stopping offsets for each partition. When all the partitions have reached
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 39a73974f5c..c80ddd3b1fa 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -503,6 +504,12 @@ public final class PulsarSourceOptions {
                                             code("PulsarClientException"))
                                     .build());
 
+    /**
+     * @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
+     *     Pulsar didn't support this config option before 1.10.1, so we have to remove this config
+     *     option.
+     */
+    @Deprecated
     public static final ConfigOption<SubscriptionInitialPosition>
             PULSAR_SUBSCRIPTION_INITIAL_POSITION =
                     ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 602a1577938..0a4dc31e8d3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -59,7 +59,6 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
@@ -113,8 +112,6 @@ public final class PulsarSourceConfigUtils {
                 builder::consumerName);
         configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
         configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
-        configuration.useOption(
-                PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
         createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
         configuration.useOption(
                 PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 806fe4a418b..ef6c69192a3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -83,6 +83,10 @@ public class SourceConfiguration extends PulsarConfiguration {
         return messageQueueCapacity;
     }
 
+    /**
+     * We would override the interval into a negative number when we set the connector with bounded
+     * stop cursor.
+     */
     public boolean isEnablePartitionDiscovery() {
         return getPartitionDiscoveryIntervalMs() > 0;
     }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
index dbab9e21781..56bbbd20a32 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator;
 
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 /**
  * The state class for pulsar source enumerator, used for storing the split state. This class is
- * managed and controlled by {@link SplitsAssignmentState}.
+ * managed and controlled by {@link SplitAssigner}.
  */
 public class PulsarSourceEnumState {
 
@@ -46,11 +49,12 @@ public class PulsarSourceEnumState {
     private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
 
     /**
-     * A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for
-     * recording assign status.
+     * It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for
+     * all flink readers. Using this map for recording assign status.
      */
     private final Map<Integer, Set<String>> readerAssignedSplits;
 
+    /** The pipeline has been triggered and topic partitions have been assigned to readers. */
     private final boolean initialized;
 
     public PulsarSourceEnumState(
@@ -85,4 +89,10 @@ public class PulsarSourceEnumState {
     public boolean isInitialized() {
         return initialized;
     }
+
+    /** The initial assignment state for Pulsar. */
+    public static PulsarSourceEnumState initialState() {
+        return new PulsarSourceEnumState(
+                new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false);
+    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 7890dcf1847..a64e9272c70 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -22,40 +22,29 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.KeySharedPolicy;
-import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Range;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import static java.util.Collections.singletonList;
 import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
-import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
-import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 
 /** The enumerator class for pulsar source. */
 @Internal
@@ -65,13 +54,12 @@ public class PulsarSourceEnumerator
     private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
 
     private final PulsarAdmin pulsarAdmin;
-    private final PulsarClient pulsarClient;
     private final PulsarSubscriber subscriber;
     private final StartCursor startCursor;
     private final RangeGenerator rangeGenerator;
     private final SourceConfiguration sourceConfiguration;
     private final SplitEnumeratorContext<PulsarPartitionSplit> context;
-    private final SplitsAssignmentState assignmentState;
+    private final SplitAssigner splitAssigner;
 
     public PulsarSourceEnumerator(
             PulsarSubscriber subscriber,
@@ -79,15 +67,14 @@ public class PulsarSourceEnumerator
             RangeGenerator rangeGenerator,
             SourceConfiguration sourceConfiguration,
             SplitEnumeratorContext<PulsarPartitionSplit> context,
-            SplitsAssignmentState assignmentState) {
+            SplitAssigner splitAssigner) {
         this.pulsarAdmin = createAdmin(sourceConfiguration);
-        this.pulsarClient = createClient(sourceConfiguration);
         this.subscriber = subscriber;
         this.startCursor = startCursor;
         this.rangeGenerator = rangeGenerator;
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
-        this.assignmentState = assignmentState;
+        this.splitAssigner = splitAssigner;
     }
 
     @Override
@@ -123,9 +110,9 @@ public class PulsarSourceEnumerator
     @Override
     public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
         // Put the split back to current pending splits.
-        assignmentState.putSplitsBackToPendingList(splits, subtaskId);
+        splitAssigner.addSplitsBack(splits, subtaskId);
 
-        // If the failed subtask has already restarted, we need to assign pending splits to it
+        // If the failed subtask has already restarted, we need to assign pending splits to it.
         if (context.registeredReaders().containsKey(subtaskId)) {
             assignPendingPartitionSplits(singletonList(subtaskId));
         }
@@ -142,7 +129,7 @@ public class PulsarSourceEnumerator
 
     @Override
     public PulsarSourceEnumState snapshotState(long checkpointId) {
-        return assignmentState.snapshotState();
+        return splitAssigner.snapshotState();
     }
 
     @Override
@@ -164,54 +151,7 @@ public class PulsarSourceEnumerator
      */
     private Set<TopicPartition> getSubscribedTopicPartitions() {
         int parallelism = context.currentParallelism();
-        Set<TopicPartition> partitions =
-                subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
-
-        // Seek start position for given partitions.
-        seekStartPosition(partitions);
-
-        return partitions;
-    }
-
-    private void seekStartPosition(Set<TopicPartition> partitions) {
-        ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
-        Set<String> seekedTopics = new HashSet<>();
-
-        for (TopicPartition partition : partitions) {
-            String topicName = partition.getFullTopicName();
-            if (!assignmentState.containsTopic(topicName) && seekedTopics.add(topicName)) {
-                try (Consumer<byte[]> consumer =
-                        sneakyClient(() -> consumerBuilder.clone().topic(topicName).subscribe())) {
-                    startCursor.seekPosition(
-                            partition.getTopic(), partition.getPartitionId(), consumer);
-                } catch (PulsarClientException e) {
-                    if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
-                        throw new IllegalArgumentException(e);
-                    } else {
-                        // WARN_ON_MISMATCH would just print this warning message.
-                        // No need to print the stacktrace.
-                        LOG.warn(
-                                "Failed to set initial consuming position for partition {}",
-                                partition,
-                                e);
-                    }
-                }
-            }
-        }
-    }
-
-    private ConsumerBuilder<byte[]> consumerBuilder() {
-        ConsumerBuilder<byte[]> builder =
-                createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration);
-        if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
-            Range range = TopicRange.createFullRange().toPulsarRange();
-            KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
-            // Force this consume use sticky hash range in Key_Shared subscription.
-            // Pulsar won't remove old message dispatcher before 2.8.2 release.
-            builder.keySharedPolicy(keySharedPolicy);
-        }
-
-        return builder;
+        return subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
     }
 
     /**
@@ -230,13 +170,55 @@ public class PulsarSourceEnumerator
         }
 
         // Append the partitions into current assignment state.
-        assignmentState.appendTopicPartitions(fetchedPartitions);
-        List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
+        List<TopicPartition> newPartitions =
+                splitAssigner.registerTopicPartitions(fetchedPartitions);
+        createSubscription(newPartitions);
 
         // Assign the new readers.
+        List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
         assignPendingPartitionSplits(registeredReaders);
     }
 
+    /** Create subscription on topic partition if it doesn't exist. */
+    private void createSubscription(List<TopicPartition> newPartitions) {
+        for (TopicPartition partition : newPartitions) {
+            String topicName = partition.getFullTopicName();
+            String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+            List<String> subscriptions =
+                    sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
+            if (!subscriptions.contains(subscriptionName)) {
+                CursorPosition position =
+                        startCursor.position(partition.getTopic(), partition.getPartitionId());
+                MessageId initialPosition = queryInitialPosition(topicName, position);
+
+                sneakyAdmin(
+                        () ->
+                                pulsarAdmin
+                                        .topics()
+                                        .createSubscription(
+                                                topicName, subscriptionName, initialPosition));
+            }
+        }
+    }
+
+    /** Query the available message id from Pulsar. */
+    private MessageId queryInitialPosition(String topicName, CursorPosition position) {
+        CursorPosition.Type type = position.getType();
+        if (type == CursorPosition.Type.TIMESTAMP) {
+            return sneakyAdmin(
+                    () ->
+                            pulsarAdmin
+                                    .topics()
+                                    .getMessageIdByTimestamp(topicName, position.getTimestamp()));
+        } else if (type == CursorPosition.Type.MESSAGE_ID) {
+            return position.getMessageId();
+        } else {
+            throw new UnsupportedOperationException("We don't support this seek type " + type);
+        }
+    }
+
+    /** Query the unassigned splits and assign them to the available readers. */
     private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
         // Validate the reader.
         pendingReaders.forEach(
@@ -248,17 +230,19 @@ public class PulsarSourceEnumerator
                 });
 
         // Assign splits to downstream readers.
-        assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
 
         // If periodically partition discovery is disabled and the initializing discovery has done,
-        // signal NoMoreSplitsEvent to pending readers
-        if (assignmentState.noMoreNewPartitionSplits()) {
-            LOG.debug(
-                    "No more PulsarPartitionSplits to assign."
-                            + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
-                    pendingReaders,
-                    sourceConfiguration.getSubscriptionDesc());
-            pendingReaders.forEach(this.context::signalNoMoreSplits);
+        // signal NoMoreSplitsEvent to pending readers.
+        for (Integer reader : pendingReaders) {
+            if (splitAssigner.noMoreSplits(reader)) {
+                LOG.debug(
+                        "No more PulsarPartitionSplits to assign."
+                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+                        reader,
+                        sourceConfiguration.getSubscriptionDesc());
+                context.signalNoMoreSplits(reader);
+            }
         }
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
deleted file mode 100644
index cbc4826583a..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-
-/** The state class for recording the split assignment. */
-@Internal
-public class SplitsAssignmentState {
-
-    private final StopCursor stopCursor;
-    private final SourceConfiguration sourceConfiguration;
-
-    // The dynamic states for checkpoint.
-    private final Set<TopicPartition> appendedPartitions;
-    // This pending splits is used for Key_Shared, Failover, Exclusive subscription.
-    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
-    // These two fields are used for Shared subscription.
-    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
-    private final Map<Integer, Set<String>> readerAssignedSplits;
-    private boolean initialized;
-
-    public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
-        this.stopCursor = stopCursor;
-        this.sourceConfiguration = sourceConfiguration;
-        this.appendedPartitions = new HashSet<>();
-        this.pendingPartitionSplits = new HashSet<>();
-        this.sharedPendingPartitionSplits = new HashMap<>();
-        this.readerAssignedSplits = new HashMap<>();
-        this.initialized = false;
-    }
-
-    public SplitsAssignmentState(
-            StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        this.stopCursor = stopCursor;
-        this.sourceConfiguration = sourceConfiguration;
-        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
-        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
-        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
-        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
-        this.initialized = sourceEnumState.isInitialized();
-    }
-
-    public PulsarSourceEnumState snapshotState() {
-        return new PulsarSourceEnumState(
-                appendedPartitions,
-                pendingPartitionSplits,
-                sharedPendingPartitionSplits,
-                readerAssignedSplits,
-                initialized);
-    }
-
-    /**
-     * Append the new fetched partitions to current state. We would generate pending source split
-     * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} don't support put the
-     * split back to enumerator, we don't support partition deletion.
-     *
-     * @param fetchedPartitions The partitions from the {@link PulsarSubscriber}.
-     */
-    public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
-        for (TopicPartition partition : fetchedPartitions) {
-            // If this partition is a new partition.
-            if (!appendedPartitions.contains(partition)) {
-                if (!sharePartition()) {
-                    // Create a split and add it to pending list.
-                    pendingPartitionSplits.add(createSplit(partition));
-                }
-
-                // Shared subscription don't create splits, we just register partitions.
-                appendedPartitions.add(partition);
-            }
-        }
-
-        // Update this initialize flag.
-        if (!initialized) {
-            this.initialized = true;
-        }
-    }
-
-    public boolean containsTopic(String topicName) {
-        return appendedPartitions.stream()
-                .anyMatch(partition -> Objects.equals(partition.getFullTopicName(), topicName));
-    }
-
-    /** Put these splits back to pending list. */
-    public void putSplitsBackToPendingList(List<PulsarPartitionSplit> splits, int readerId) {
-        if (!sharePartition()) {
-            // Put these splits back to normal pending list.
-            pendingPartitionSplits.addAll(splits);
-        } else {
-            // Put the splits back to shared pending list.
-            Set<PulsarPartitionSplit> pending =
-                    sharedPendingPartitionSplits.computeIfAbsent(readerId, id -> new HashSet<>());
-            pending.addAll(splits);
-        }
-    }
-
-    public Optional<SplitsAssignment<PulsarPartitionSplit>> assignSplits(
-            List<Integer> pendingReaders) {
-        // Avoid empty readers assign.
-        if (pendingReaders.isEmpty()) {
-            return Optional.empty();
-        }
-
-        Map<Integer, List<PulsarPartitionSplit>> assignMap;
-
-        // We extract the assign logic into two method for better readability.
-        if (!sharePartition()) {
-            assignMap = assignNormalSplits(pendingReaders);
-        } else {
-            assignMap = assignSharedSplits(pendingReaders);
-        }
-
-        if (assignMap.isEmpty()) {
-            return Optional.empty();
-        } else {
-            return Optional.of(new SplitsAssignment<>(assignMap));
-        }
-    }
-
-    /**
-     * @return It would return true only if periodically partition discovery is disabled, the
-     *     initializing partition discovery has finished AND there is no pending splits for
-     *     assignment.
-     */
-    public boolean noMoreNewPartitionSplits() {
-        return !sourceConfiguration.isEnablePartitionDiscovery()
-                && initialized
-                && pendingPartitionSplits.isEmpty();
-    }
-
-    // ----------------- private methods -------------------
-
-    /** The splits don't shared for all the readers. */
-    private Map<Integer, List<PulsarPartitionSplit>> assignNormalSplits(
-            List<Integer> pendingReaders) {
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
-        // Drain a list of splits.
-        List<PulsarPartitionSplit> pendingSplits = drainPendingPartitionsSplits();
-        for (int i = 0; i < pendingSplits.size(); i++) {
-            PulsarPartitionSplit split = pendingSplits.get(i);
-            int readerId = pendingReaders.get(i % pendingReaders.size());
-            assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
-        }
-
-        return assignMap;
-    }
-
-    /** Every split would be shared among available readers. */
-    private Map<Integer, List<PulsarPartitionSplit>> assignSharedSplits(
-            List<Integer> pendingReaders) {
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
-        // Drain the splits from share pending list.
-        for (Integer reader : pendingReaders) {
-            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
-            if (pendingSplits == null) {
-                pendingSplits = new HashSet<>();
-            }
-
-            Set<String> assignedSplits =
-                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
-
-            for (TopicPartition partition : appendedPartitions) {
-                String partitionName = partition.toString();
-                if (!assignedSplits.contains(partitionName)) {
-                    pendingSplits.add(createSplit(partition));
-                    assignedSplits.add(partitionName);
-                }
-            }
-
-            if (!pendingSplits.isEmpty()) {
-                assignMap.put(reader, new ArrayList<>(pendingSplits));
-            }
-        }
-
-        return assignMap;
-    }
-
-    private PulsarPartitionSplit createSplit(TopicPartition partition) {
-        try {
-            StopCursor stop = InstantiationUtil.clone(stopCursor);
-            return new PulsarPartitionSplit(partition, stop);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    private List<PulsarPartitionSplit> drainPendingPartitionsSplits() {
-        List<PulsarPartitionSplit> splits = new ArrayList<>(pendingPartitionSplits);
-        pendingPartitionSplits.clear();
-
-        return splits;
-    }
-
-    /** {@link SubscriptionType#Shared} mode should share a same split for all the readers. */
-    private boolean sharePartition() {
-        return sourceConfiguration.getSubscriptionType() == SubscriptionType.Shared;
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
new file mode 100644
index 00000000000..087e96157d6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
+ * and {@link SubscriptionType#Key_Shared} subscriptions.
+ */
+@Internal
+public class NonSharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8412586087991597092L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private boolean initialized;
+
+    public NonSharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                pendingPartitionSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+
+        List<PulsarPartitionSplit> partitionSplits = new ArrayList<>(pendingPartitionSplits);
+        int readerCount = readers.size();
+        for (int i = 0; i < partitionSplits.size(); i++) {
+            int index = i % readerCount;
+            Integer readerId = readers.get(index);
+            PulsarPartitionSplit split = partitionSplits.get(i);
+            assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
+        }
+        pendingPartitionSplits.clear();
+
+        return Optional.of(new SplitsAssignment<>(assignMap));
+    }
+
+    @Override
+    public boolean noMoreSplits(Integer reader) {
+        return !enablePartitionDiscovery && initialized && pendingPartitionSplits.isEmpty();
+    }
+
+    @Override
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions,
+                pendingPartitionSplits,
+                new HashMap<>(),
+                new HashMap<>(),
+                initialized);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
new file mode 100644
index 00000000000..48d75c8dee3
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+@Internal
+public class SharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8468503133499402491L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
+    private final Map<Integer, Set<String>> readerAssignedSplits;
+    private boolean initialized;
+
+    public SharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
+        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        Set<PulsarPartitionSplit> pendingPartitionSplits =
+                sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>());
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+        for (Integer reader : readers) {
+            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
+            if (pendingSplits == null) {
+                pendingSplits = new HashSet<>();
+            }
+
+            Set<String> assignedSplits =
+                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
+
+            for (TopicPartition partition : appendedPartitions) {
+                String partitionName = partition.toString();
+                if (!assignedSplits.contains(partitionName)) {
+                    pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                    assignedSplits.add(partitionName);
+                }
+            }
+
+            if (!pendingSplits.isEmpty()) {
+                assignMap.put(reader, new ArrayList<>(pendingSplits));
+            }
+        }
+
+        if (assignMap.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new SplitsAssignment<>(assignMap));
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits(Integer reader) {
+        Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.get(reader);
+        Set<String> assignedSplits = readerAssignedSplits.get(reader);
+
+        return !enablePartitionDiscovery
+                && initialized
+                && (pendingSplits == null || pendingSplits.isEmpty())
+                && (assignedSplits != null && assignedSplits.size() == appendedPartitions.size());
+    }
+
+    @Override
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions,
+                new HashSet<>(),
+                sharedPendingPartitionSplits,
+                readerAssignedSplits,
+                initialized);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
new file mode 100644
index 00000000000..bc03f5103fd
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The split assigner for different subscription. We would spread all the splits to different
+ * readers and store all the state into checkpoint.
+ */
+@Internal
+public interface SplitAssigner extends Serializable {
+
+    /**
+     * Add the current available partitions into assigner.
+     *
+     * @param fetchedPartitions The available partitions queried from Pulsar broker.
+     * @return New topic partitions compare to previous registered partitions.
+     */
+    List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions);
+
+    /**
+     * Add a split back to the split assigner if the reader fails. We would try to reassign the
+     * split or add it to the pending list.
+     */
+    void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId);
+
+    /** Create a split assignment from the current readers. */
+    Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers);
+
+    /**
+     * It would return true only if periodically partition discovery is disabled, the initializing
+     * partition discovery has finished AND there is no pending splits for assignment.
+     */
+    boolean noMoreSplits(Integer reader);
+
+    /** Snapshot the current assign state into checkpoint. */
+    PulsarSourceEnumState snapshotState();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
new file mode 100644
index 00000000000..3e6ebccb49b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+
+/** The factory for creating split assigner. */
+@Internal
+public final class SplitAssignerFactory {
+
+    private SplitAssignerFactory() {
+        // No public constructor.
+    }
+
+    /** Create blank assigner. */
+    public static SplitAssigner create(
+            StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
+        return create(stopCursor, sourceConfiguration, initialState());
+    }
+
+    /** Create assigner from checkpoint state. */
+    public static SplitAssigner create(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+        if (subscriptionType == Exclusive
+                || subscriptionType == Failover
+                || subscriptionType == Key_Shared) {
+            return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else if (subscriptionType == Shared) {
+            return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else {
+            throw new IllegalArgumentException(
+                    "We don't support this subscription type: " + subscriptionType);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index a2aaff62906..c965ff962f8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -18,18 +18,16 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
 
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
-/** The class for defining the start or stop position. */
+/**
+ * The class for defining the start or stop position. We only expose the constructor for end user.
+ */
 @PublicEvolving
 public final class CursorPosition implements Serializable {
     private static final long serialVersionUID = -802405183307684549L;
@@ -40,34 +38,31 @@ public final class CursorPosition implements Serializable {
 
     private final Long timestamp;
 
-    public CursorPosition(@Nullable MessageId messageId) {
+    public CursorPosition(MessageId messageId) {
         this.type = Type.MESSAGE_ID;
         this.messageId = messageId;
         this.timestamp = null;
     }
 
-    public CursorPosition(@Nullable Long timestamp) {
+    public CursorPosition(Long timestamp) {
         this.type = Type.TIMESTAMP;
         this.messageId = null;
         this.timestamp = timestamp;
     }
 
-    @VisibleForTesting
+    @Internal
+    public Type getType() {
+        return type;
+    }
+
+    @Internal
     public MessageId getMessageId() {
         return messageId;
     }
 
-    /** Pulsar consumer could be subscribed by the position. */
-    public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
-        if (type == Type.MESSAGE_ID) {
-            consumer.seek(messageId);
-        } else {
-            if (timestamp != null) {
-                consumer.seek(timestamp);
-            } else {
-                consumer.seek(System.currentTimeMillis());
-            }
-        }
+    @Internal
+    public Long getTimestamp() {
+        return timestamp;
     }
 
     @Override
@@ -82,6 +77,7 @@ public final class CursorPosition implements Serializable {
     /**
      * The position type for reader to choose whether timestamp or message id as the start position.
      */
+    @Internal
     public enum Type {
         TIMESTAMP,
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
new file mode 100644
index 00000000000..a8c3a6b2ef2
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The helper class for Pulsar's message id. */
+@Internal
+public final class MessageIdUtils {
+
+    private MessageIdUtils() {
+        // No public constructor.
+    }
+
+    /**
+     * The implementation from <a
+     * href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
+     * code snippet</a> to get next message id.
+     */
+    public static MessageId nextMessageId(MessageId messageId) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+
+        if (idImpl.getEntryId() < 0) {
+            return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex());
+        } else {
+            return newMessageId(
+                    idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex());
+        }
+    }
+
+    /**
+     * Convert the message id interface to its backend implementation. And check if it's a batch
+     * message id. We don't support the batch message for its low performance now.
+     */
+    public static MessageIdImpl unwrapMessageId(MessageId messageId) {
+        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (idImpl instanceof BatchMessageIdImpl) {
+            int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
+            checkArgument(batchSize == 1, "We only support normal message id currently.");
+        }
+
+        return idImpl;
+    }
+
+    /** Hide the message id implementation. */
+    public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) {
+        return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
index af35319a5a7..9c1d699a269 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
@@ -22,9 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.MessageIdStartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.TimestampStartCursor;
 
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.io.Serializable;
@@ -43,13 +41,6 @@ public interface StartCursor extends Serializable {
 
     CursorPosition position(String topic, int partitionId);
 
-    /** Helper method for seek the right position for given pulsar consumer. */
-    default void seekPosition(String topic, int partitionId, Consumer<?> consumer)
-            throws PulsarClientException {
-        CursorPosition position = position(topic, partitionId);
-        position.seekPosition(consumer);
-    }
-
     // --------------------------- Static Factory Methods -----------------------------
 
     static StartCursor defaultStartCursor() {
@@ -64,19 +55,38 @@ public interface StartCursor extends Serializable {
         return fromMessageId(MessageId.latest);
     }
 
+    /**
+     * Find the available message id and start consuming from it. The given message is included in
+     * the consuming result by default if you provide a specified message id instead of {@link
+     * MessageId#earliest} or {@link MessageId#latest}.
+     */
     static StartCursor fromMessageId(MessageId messageId) {
         return fromMessageId(messageId, true);
     }
 
     /**
      * @param messageId Find the available message id and start consuming from it.
-     * @param inclusive {@code true} would include the given message id.
+     * @param inclusive {@code true} would include the given message id if it's not the {@link
+     *     MessageId#earliest} or {@link MessageId#latest}.
      */
     static StartCursor fromMessageId(MessageId messageId, boolean inclusive) {
         return new MessageIdStartCursor(messageId, inclusive);
     }
 
+    /**
+     * This method is designed for seeking message from event time. But Pulsar didn't support
+     * seeking from message time, instead, it would seek the position from publish time. We only
+     * keep this method for backward compatible.
+     *
+     * @deprecated Use {@link #fromPublishTime(long)} instead.
+     */
+    @Deprecated
     static StartCursor fromMessageTime(long timestamp) {
-        return new TimestampStartCursor(timestamp);
+        return new TimestampStartCursor(timestamp, true);
+    }
+
+    /** Seek the start position by using message publish time. */
+    static StartCursor fromPublishTime(long timestamp) {
+        return new TimestampStartCursor(timestamp, true);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index 0bf46ce1282..d44c78fcf1a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -44,11 +44,55 @@ public interface StopCursor extends Serializable {
     /** The open method for the cursor initializer. This method could be executed multiple times. */
     default void open(PulsarAdmin admin, TopicPartition partition) {}
 
-    /**
-     * Determine whether to pause consumption on the current message by the returned boolean value.
-     * The message presented in method argument wouldn't be consumed if the return result is true.
-     */
-    boolean shouldStop(Message<?> message);
+    /** Determine whether to pause consumption on the current message by the returned enum. */
+    StopCondition shouldStop(Message<?> message);
+
+    /** The conditional for control the stop behavior of the pulsar source. */
+    @PublicEvolving
+    enum StopCondition {
+
+        /** This message should be included in the result. */
+        CONTINUE,
+        /** This message should be included in the result and stop consuming. */
+        EXACTLY,
+        /** Stop consuming, the given message wouldn't be included in the result. */
+        TERMINATE;
+
+        /**
+         * Common methods for comparing the message id.
+         *
+         * @param desired The stop goal of the message id.
+         * @param current The upcoming message id.
+         * @param inclusive Should the desired message be included in the consuming result.
+         */
+        public static StopCondition compare(
+                MessageId desired, MessageId current, boolean inclusive) {
+            if (current.compareTo(desired) < 0) {
+                return StopCondition.CONTINUE;
+            } else if (current.compareTo(desired) == 0) {
+                return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+            } else {
+                return StopCondition.TERMINATE;
+            }
+        }
+
+        /**
+         * Common methods for comparing the message time.
+         *
+         * @param desired The stop goal of the message time.
+         * @param current The upcoming message time.
+         * @param inclusive Should the desired message be included in the consuming result.
+         */
+        public static StopCondition compare(long desired, long current, boolean inclusive) {
+            if (current < desired) {
+                return StopCondition.CONTINUE;
+            } else if (current == desired) {
+                return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+            } else {
+                return StopCondition.TERMINATE;
+            }
+        }
+    }
 
     // --------------------------- Static Factory Methods -----------------------------
 
@@ -61,32 +105,52 @@ public interface StopCursor extends Serializable {
     }
 
     static StopCursor latest() {
-        return new LatestMessageStopCursor();
+        return new LatestMessageStopCursor(true);
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId, true);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
-        return new EventTimestampStopCursor(timestamp);
+        return new EventTimestampStopCursor(timestamp, false);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp, true);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
-        return new PublishTimestampStopCursor(timestamp);
+        return new PublishTimestampStopCursor(timestamp, false);
+    }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp, true);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index f8079608cc6..33591c583f6 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -25,7 +25,8 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
 
 /** This cursor would left pulsar start consuming from a specific message id. */
 public class MessageIdStartCursor implements StartCursor {
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
-        if (inclusive) {
-            this.messageId = messageId;
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) {
+            this.messageId = idImpl;
         } else {
-            checkState(
-                    messageId instanceof MessageIdImpl,
-                    "We only support normal message id and batch message id.");
-            MessageIdImpl id = (MessageIdImpl) messageId;
-            if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
-                this.messageId = messageId;
-            } else {
-                this.messageId =
-                        new MessageIdImpl(
-                                id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
-            }
+            this.messageId = nextMessageId(idImpl);
         }
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
index eb4ea32ebb6..da51a58e943 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -27,8 +27,8 @@ public class TimestampStartCursor implements StartCursor {
 
     private final long timestamp;
 
-    public TimestampStartCursor(long timestamp) {
-        this.timestamp = timestamp;
+    public TimestampStartCursor(long timestamp, boolean inclusive) {
+        this.timestamp = inclusive ? timestamp : timestamp + 1;
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
index e425545de44..d2a44ea362d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class EventTimestampStopCursor implements StopCursor {
     private static final long serialVersionUID = 2391576769339369027L;
 
     private final long timestamp;
+    private final boolean inclusive;
 
-    public EventTimestampStopCursor(long timestamp) {
+    public EventTimestampStopCursor(long timestamp, boolean inclusive) {
         this.timestamp = timestamp;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return message.getEventTime() >= timestamp;
+    public StopCondition shouldStop(Message<?> message) {
+        long eventTime = message.getEventTime();
+        return StopCondition.compare(timestamp, eventTime, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index 257081f5c89..0de963ed4fb 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -30,24 +30,29 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtil
 /**
  * A stop cursor that initialize the position to the latest message id. The offsets initialization
  * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
- * PulsarSourceEnumerator}.
+ * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default.
  */
 public class LatestMessageStopCursor implements StopCursor {
     private static final long serialVersionUID = 1702059838323965723L;
 
     private MessageId messageId;
+    private final boolean inclusive;
+
+    public LatestMessageStopCursor(boolean inclusive) {
+        this.inclusive = inclusive;
+    }
+
+    @Override
+    public StopCondition shouldStop(Message<?> message) {
+        MessageId current = message.getMessageId();
+        return StopCondition.compare(messageId, current, inclusive);
+    }
 
     @Override
     public void open(PulsarAdmin admin, TopicPartition partition) {
         if (messageId == null) {
             String topic = partition.getFullTopicName();
-            messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
+            this.messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
         }
     }
-
-    @Override
-    public boolean shouldStop(Message<?> message) {
-        MessageId id = message.getMessageId();
-        return id.compareTo(messageId) >= 0;
-    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
index 7af55a00cc0..03d83aa4495 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
@@ -22,6 +22,12 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.MessageId.earliest;
+import static org.apache.pulsar.client.api.MessageId.latest;
 
 /**
  * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
@@ -32,24 +38,22 @@ public class MessageIdStopCursor implements StopCursor {
 
     private final MessageId messageId;
 
-    private final boolean exclusive;
+    private final boolean inclusive;
 
-    public MessageIdStopCursor(MessageId messageId) {
-        this(messageId, true);
-    }
+    public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported.");
+        checkArgument(
+                !latest.equals(idImpl),
+                "MessageId.latest is not supported, use LatestMessageStopCursor instead.");
 
-    public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
-        this.messageId = messageId;
-        this.exclusive = exclusive;
+        this.messageId = idImpl;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        MessageId id = message.getMessageId();
-        if (exclusive) {
-            return id.compareTo(messageId) > 0;
-        } else {
-            return id.compareTo(messageId) >= 0;
-        }
+    public StopCondition shouldStop(Message<?> message) {
+        MessageId current = message.getMessageId();
+        return StopCondition.compare(messageId, current, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
index ff2c619afb8..3eb035634ae 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
@@ -27,7 +27,7 @@ public class NeverStopCursor implements StopCursor {
     private static final long serialVersionUID = -3113601090292771786L;
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return false;
+    public StopCondition shouldStop(Message<?> message) {
+        return StopCondition.CONTINUE;
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
index b598e7addd4..2dfdd765842 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class PublishTimestampStopCursor implements StopCursor {
     private static final long serialVersionUID = 4386276745339324527L;
 
     private final long timestamp;
+    private final boolean inclusive;
 
-    public PublishTimestampStopCursor(long timestamp) {
+    public PublishTimestampStopCursor(long timestamp, boolean inclusive) {
         this.timestamp = timestamp;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return message.getPublishTime() >= timestamp;
+    public StopCondition shouldStop(Message<?> message) {
+        long publishTime = message.getPublishTime();
+        return StopCondition.compare(timestamp, publishTime, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
index 08ba1faa442..b8a55bf8a34 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
@@ -48,7 +48,7 @@ public interface PulsarSubscriber extends Serializable {
 
     /**
      * Get a set of subscribed {@link TopicPartition}s. The method could throw {@link
-     * IllegalStateException}, a extra try catch is required.
+     * IllegalStateException}, an extra try catch is required.
      *
      * @param pulsarAdmin The admin interface used to retrieve subscribed topic partitions.
      * @param rangeGenerator The range for different partitions.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 6a5d5152231..820888a7bf3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -77,7 +77,7 @@ public final class PulsarSourceReaderFactory {
         SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
         if (subscriptionType == SubscriptionType.Failover
                 || subscriptionType == SubscriptionType.Exclusive) {
-            // Create a ordered split reader supplier.
+            // Create an ordered split reader supplier.
             Supplier<PulsarOrderedPartitionSplitReader<OUT>> splitReaderSupplier =
                     () ->
                             new PulsarOrderedPartitionSplitReader<>(
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
index bb6d79641f5..316431aa75e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -20,14 +20,13 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -36,10 +35,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
 
 /**
  * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
@@ -75,18 +76,39 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
     }
 
     @Override
-    protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
         MessageId latestConsumedId = split.getLatestConsumedId();
 
         // Reset the start position for ordered pulsar consumer.
         if (latestConsumedId != null) {
-            StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false);
-            TopicPartition partition = split.getPartition();
-
+            LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
             try {
-                startCursor.seekPosition(
-                        partition.getTopic(), partition.getPartitionId(), consumer);
-            } catch (PulsarClientException e) {
+                MessageId initialPosition;
+                if (latestConsumedId == MessageId.latest
+                        || latestConsumedId == MessageId.earliest) {
+                    // for compatibility
+                    initialPosition = latestConsumedId;
+                } else {
+                    initialPosition = nextMessageId(latestConsumedId);
+                }
+
+                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
+                // See https://github.com/apache/pulsar/issues/16757 for more details.
+
+                String topicName = split.getPartition().getFullTopicName();
+                List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+                String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+                if (!subscriptions.contains(subscriptionName)) {
+                    // If this subscription is not available. Just create it.
+                    pulsarAdmin
+                            .topics()
+                            .createSubscription(topicName, subscriptionName, initialPosition);
+                } else {
+                    // Reset the subscription if this is existed.
+                    pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
+                }
+            } catch (PulsarAdminException e) {
                 if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
                     throw new IllegalArgumentException(e);
                 } else {
@@ -95,7 +117,7 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
                     LOG.warn(
                             "Failed to reset cursor to {} on partition {}",
                             latestConsumedId,
-                            partition,
+                            split.getPartition(),
                             e);
                 }
             }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 37b5630a8d1..2d4214262c8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -52,7 +53,6 @@ import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
@@ -70,7 +70,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
     protected final PulsarAdmin pulsarAdmin;
     protected final SourceConfiguration sourceConfiguration;
     protected final PulsarDeserializationSchema<OUT> deserializationSchema;
-    protected final AtomicBoolean wakeup;
 
     protected Consumer<byte[]> pulsarConsumer;
     protected PulsarPartitionSplit registeredSplit;
@@ -84,7 +83,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
         this.deserializationSchema = deserializationSchema;
-        this.wakeup = new AtomicBoolean(false);
     }
 
     @Override
@@ -96,9 +94,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
             return builder.build();
         }
 
-        // Set wakeup to false for start consuming.
-        wakeup.compareAndSet(true, false);
-
         StopCursor stopCursor = registeredSplit.getStopCursor();
         String splitId = registeredSplit.splitId();
         PulsarMessageCollector<OUT> collector = new PulsarMessageCollector<>(splitId, builder);
@@ -106,9 +101,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
         // Consume message from pulsar until it was woke up by flink reader.
         for (int messageNum = 0;
-                messageNum < sourceConfiguration.getMaxFetchRecords()
-                        && deadline.hasTimeLeft()
-                        && isNotWakeup();
+                messageNum < sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft();
                 messageNum++) {
             try {
                 Duration timeout = deadline.timeLeftIfAny();
@@ -117,14 +110,18 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
                     break;
                 }
 
-                // Deserialize message.
-                collector.setMessage(message);
-                deserializationSchema.deserialize(message, collector);
+                StopCondition condition = stopCursor.shouldStop(message);
+
+                if (condition == StopCondition.CONTINUE || condition == StopCondition.EXACTLY) {
+                    // Deserialize message.
+                    collector.setMessage(message);
+                    deserializationSchema.deserialize(message, collector);
 
-                // Acknowledge message if need.
-                finishedPollMessage(message);
+                    // Acknowledge message if need.
+                    finishedPollMessage(message);
+                }
 
-                if (stopCursor.shouldStop(message)) {
+                if (condition == StopCondition.EXACTLY || condition == StopCondition.TERMINATE) {
                     builder.addFinishedSplit(splitId);
                     break;
                 }
@@ -165,23 +162,27 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
                 newSplits.size() == 1, "This pulsar split reader only support one split.");
         PulsarPartitionSplit newSplit = newSplits.get(0);
 
+        // Open stop cursor.
+        newSplit.open(pulsarAdmin);
+
+        // Before creating the consumer.
+        beforeCreatingConsumer(newSplit);
+
         // Create pulsar consumer.
         Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
 
-        // Open start & stop cursor.
-        newSplit.open(pulsarAdmin);
-
-        // Start Consumer.
-        startConsumer(newSplit, consumer);
+        // After creating the consumer.
+        afterCreatingConsumer(newSplit, consumer);
 
         LOG.info("Register split {} consumer for current reader.", newSplit);
+
         this.registeredSplit = newSplit;
         this.pulsarConsumer = consumer;
     }
 
     @Override
     public void wakeUp() {
-        wakeup.compareAndSet(false, true);
+        // Nothing to do on this method.
     }
 
     @Override
@@ -197,14 +198,16 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
     protected abstract void finishedPollMessage(Message<byte[]> message);
 
-    protected abstract void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer);
-
-    // --------------------------- Helper Methods -----------------------------
+    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
+        // Nothing to do by default.
+    }
 
-    protected boolean isNotWakeup() {
-        return !wakeup.get();
+    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+        // Nothing to do by default.
     }
 
+    // --------------------------- Helper Methods -----------------------------
+
     /** Create a specified {@link Consumer} by the given split information. */
     protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
         return createPulsarConsumer(split.getPartition());
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 5940cc9ac19..cc0167d8d14 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -126,7 +126,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl
     }
 
     @Override
-    protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
         TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
 
         // Abort the uncommitted pulsar transaction.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 0ec693a2b26..8eb8f599318 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -30,13 +30,15 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link SourceSplit} implementation for a Pulsar's partition. */
 @Internal
-public class PulsarPartitionSplit implements SourceSplit {
+public class PulsarPartitionSplit implements SourceSplit, Serializable {
+    private static final long serialVersionUID = -6857317360756062625L;
 
     private final TopicPartition partition;
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 9a72c8ace2a..94c5c8329ae 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -24,8 +24,6 @@ import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingC
 import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -33,32 +31,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
-import org.junit.jupiter.api.Disabled;
-
 /** Unite test class for {@link PulsarSource}. */
 @SuppressWarnings("unused")
 class PulsarSourceITCase extends SourceTestSuiteBase<String> {
 
-    @Disabled // TODO: remove override after FLINK-26177 is fixed
-    @Override
-    public void testScaleUp(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleUp(testEnv, externalContext, semantic);
-    }
-
-    @Disabled // TODO: remove override after FLINK-26177 is fixed
-    @Override
-    public void testScaleDown(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleDown(testEnv, externalContext, semantic);
-    }
-
     // Defines test environment on Flink MiniCluster
     @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
index 38c40ed88ef..5f18e8f5131 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -51,7 +52,9 @@ class PulsarSourceEnumStateSerializerTest {
                 Collections.singleton(
                         new PulsarPartitionSplit(
                                 new TopicPartition(randomAlphabetic(10), 10, createFullRange()),
-                                StopCursor.defaultStopCursor()));
+                                StopCursor.defaultStopCursor(),
+                                newMessageId(100L, 23L, 44),
+                                null));
         Map<Integer, Set<PulsarPartitionSplit>> shared = Collections.singletonMap(5, splits);
         Map<Integer, Set<String>> mapping =
                 ImmutableMap.of(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 1dcbe84ba61..aebb76119df 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -51,6 +53,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
@@ -368,6 +371,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
         Configuration configuration = operator().config();
         configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         if (enablePeriodicPartitionDiscovery) {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L);
         } else {
@@ -375,15 +379,15 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
         }
         SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
 
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(latest(), sourceConfiguration, sourceEnumState);
+        SplitAssigner assigner =
+                SplitAssignerFactory.create(latest(), sourceConfiguration, sourceEnumState);
         return new PulsarSourceEnumerator(
                 subscriber,
                 StartCursor.earliest(),
                 new FullRangeGenerator(),
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                assigner);
     }
 
     private void registerReader(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
deleted file mode 100644
index ac811c3dddb..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.InstanceOfAssertFactories.map;
-
-/** Unit tests for {@link SplitsAssignmentState}. */
-class SplitsAssignmentStateTest {
-
-    private final Set<TopicPartition> partitions =
-            Sets.newHashSet(
-                    new TopicPartition("some-topic", 1, new TopicRange(1, 30)),
-                    new TopicPartition("some-topic", 2, new TopicRange(31, 60)),
-                    new TopicPartition("some-topic", 3, new TopicRange(61, MAX_RANGE)),
-                    new TopicPartition(randomAlphabetic(10), -1, createFullRange()));
-
-    @Test
-    void assignSplitsForSharedSubscription() {
-        SplitsAssignmentState state1 =
-                new SplitsAssignmentState(
-                        StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Shared));
-        state1.appendTopicPartitions(partitions);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
-        assertThat(assignment1)
-                .isPresent()
-                .get()
-                .extracting(SplitsAssignment::assignment)
-                .asInstanceOf(map(Integer.class, List.class))
-                .hasSize(5)
-                .allSatisfy((idx, list) -> assertThat(list).hasSize(4));
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-        assertThat(assignment2).isNotPresent();
-
-        // Reassign reader 3.
-        state1.putSplitsBackToPendingList(assignment1.get().assignment().get(3), 3);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment3 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 4));
-        assertThat(assignment3).isNotPresent();
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment4 =
-                state1.assignSplits(singletonList(3));
-        assertThat(assignment4)
-                .isPresent()
-                .get()
-                .extracting(SplitsAssignment::assignment)
-                .asInstanceOf(map(Integer.class, List.class))
-                .hasSize(1);
-    }
-
-    @Test
-    void assignSplitsForExclusiveSubscription() {
-        SplitsAssignmentState state1 =
-                new SplitsAssignmentState(
-                        StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Exclusive));
-        state1.appendTopicPartitions(partitions);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
-        assertThat(assignment1).isPresent();
-        assertThat(assignment1.get().assignment())
-                .hasSize(4)
-                .allSatisfy((idx, list) -> assertThat(list).hasSize(1));
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-        assertThat(assignment2).isNotPresent();
-    }
-
-    private SourceConfiguration createConfig(SubscriptionType type) {
-        Configuration configuration = new Configuration();
-        configuration.set(PULSAR_SUBSCRIPTION_TYPE, type);
-
-        return new SourceConfiguration(configuration);
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
new file mode 100644
index 00000000000..2e9ada3b741
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link NonSharedSplitAssigner}. */
+class NonSharedSplitAssignerTest extends SplitAssignerTestBase<NonSharedSplitAssigner> {
+
+    @Test
+    void noMoreSplits() {
+        NonSharedSplitAssigner assigner = splitAssigner(true);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner = splitAssigner(false);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.registerTopicPartitions(createPartitions("f", 8));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(1));
+        assertTrue(assigner.noMoreSplits(1));
+        assertTrue(assigner.noMoreSplits(3));
+    }
+
+    @Test
+    void partitionsAssignment() {
+        NonSharedSplitAssigner assigner = splitAssigner(true);
+        assigner.registerTopicPartitions(createPartitions("d", 4));
+        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+        // Assignment with initial states.
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(1);
+
+        // Reassignment with same readers.
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+
+        // Register new partition and assign.
+        assigner.registerTopicPartitions(createPartitions("e", 5));
+        assigner.registerTopicPartitions(createPartitions("f", 1));
+        assigner.registerTopicPartitions(createPartitions("g", 3));
+        assigner.registerTopicPartitions(createPartitions("h", 4));
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Assign to new readers.
+        readers = Arrays.asList(2, 4, 6, 8);
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+    }
+
+    @Override
+    protected NonSharedSplitAssigner createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
new file mode 100644
index 00000000000..91584b87688
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link SharedSplitAssigner}. */
+class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> {
+
+    @Test
+    void noMoreSplits() {
+        SharedSplitAssigner assigner = splitAssigner(true);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner = splitAssigner(false);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.registerTopicPartitions(createPartitions("f", 8));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(1));
+        assertTrue(assigner.noMoreSplits(1));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(3));
+        assertTrue(assigner.noMoreSplits(3));
+    }
+
+    @Test
+    void partitionsAssignment() {
+        SharedSplitAssigner assigner = splitAssigner(true);
+        assigner.registerTopicPartitions(createPartitions("d", 4));
+        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+        // Assignment with initial states.
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Reassignment with same readers.
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+
+        // Register new partition and assign.
+        assigner.registerTopicPartitions(createPartitions("e", 5));
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Assign to new readers.
+        readers = Arrays.asList(2, 4, 6, 8);
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment())
+                .hasSize(4)
+                .allSatisfy((k, v) -> assertThat(v).hasSize(2));
+    }
+
+    @Override
+    protected SharedSplitAssigner createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
new file mode 100644
index 00000000000..65094014720
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils for split assigners. */
+abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger {
+
+    @Test
+    void registerTopicPartitionsWillOnlyReturnNewPartitions() {
+        T assigner = splitAssigner(true);
+
+        Set<TopicPartition> partitions = createPartitions("persistent://public/default/a", 1);
+        List<TopicPartition> newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions)
+                .hasSize(1)
+                .first()
+                .hasFieldOrPropertyWithValue("topic", "persistent://public/default/a")
+                .hasFieldOrPropertyWithValue("partitionId", 1);
+
+        newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions).isEmpty();
+
+        partitions = createPartitions("persistent://public/default/b", 2);
+        newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions)
+                .hasSize(1)
+                .hasSize(1)
+                .first()
+                .hasFieldOrPropertyWithValue("topic", "persistent://public/default/b")
+                .hasFieldOrPropertyWithValue("partitionId", 2);
+    }
+
+    @Test
+    void noReadersProvideForAssignment() {
+        T assigner = splitAssigner(false);
+        assigner.registerTopicPartitions(createPartitions("c", 5));
+
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(emptyList());
+        assertThat(assignment).isNotPresent();
+    }
+
+    @Test
+    void noPartitionsProvideForAssignment() {
+        T assigner = splitAssigner(true);
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(singletonList(4));
+        assertThat(assignment).isNotPresent();
+    }
+
+    protected Set<TopicPartition> createPartitions(String topic, int partitionId) {
+        TopicPartition p1 = new TopicPartition(topic, partitionId, createFullRange());
+        return singleton(p1);
+    }
+
+    protected T splitAssigner(boolean discovery) {
+        Configuration configuration = new Configuration();
+
+        if (discovery) {
+            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L);
+        } else {
+            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
+        }
+
+        SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
+        return createAssigner(defaultStopCursor(), sourceConfiguration, initialState());
+    }
+
+    protected abstract T createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState);
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index d003107793a..301368b6f4a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
@@ -43,12 +42,13 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test different implementation of StopCursor. */
-public class StopCursorTest extends PulsarTestSuiteBase {
+class StopCursorTest extends PulsarTestSuiteBase {
 
     @Test
     void publishTimeStopCursor() throws IOException {
@@ -64,7 +64,7 @@ public class StopCursorTest extends PulsarTestSuiteBase {
         // send the first message and set the stopCursor to filter any late stopCursor
         operator()
                 .sendMessage(
-                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        topicNameWithPartition(topicName, 0),
                         Schema.STRING,
                         randomAlphanumeric(10));
         long currentTimeStamp = System.currentTimeMillis();
@@ -85,12 +85,11 @@ public class StopCursorTest extends PulsarTestSuiteBase {
         // send the second message and expect it will not be received
         operator()
                 .sendMessage(
-                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        topicNameWithPartition(topicName, 0),
                         Schema.STRING,
                         randomAlphanumeric(10));
         RecordsWithSplitIds<PulsarMessage<String>> secondResult = splitReader.fetch();
-        assertThat(secondResult.nextSplit()).isNotNull();
-        assertThat(firstResult.nextRecordFromSplit()).isNull();
+        assertThat(secondResult.nextSplit()).isNull();
         assertThat(secondResult.finishedSplits()).isNotEmpty();
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index 538e45826d7..9ffbda74260 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -86,7 +85,7 @@ import static org.assertj.core.api.Assertions.assertThat;
     TestOrderlinessExtension.class,
     TestLoggerExtension.class,
 })
-public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
+abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
 
     @RegisterExtension
     PulsarSplitReaderInvocationContextProvider provider =
@@ -138,8 +137,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
         // create consumer and seek before split changes
         try (Consumer<byte[]> consumer = reader.createPulsarConsumer(partition)) {
             // inclusive messageId
-            StartCursor startCursor = StartCursor.fromMessageId(startPosition);
-            startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
+            consumer.seek(startPosition);
         } catch (PulsarClientException e) {
             sneakyThrow(e);
         }
@@ -185,7 +183,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
         if (verify) {
             assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount);
             if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
-                assertThat(finishedSplits).as("Split should not be marked as finished").hasSize(0);
+                assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty();
             } else {
                 assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1);
             }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index f238a03bfa5..9eaa24041c7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -24,6 +24,9 @@ import org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 /** Common test context for pulsar based test. */
 public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
@@ -39,10 +42,13 @@ public abstract class PulsarTestContext<T> implements DataStreamSourceExternalCo
     // Helper methods for generating data.
 
     protected List<String> generateStringTestData(int splitIndex, long seed) {
-        int recordNum = 300;
+        Random random = new Random(seed);
+        int recordNum = 300 + random.nextInt(200);
         List<String> records = new ArrayList<>(recordNum);
+
         for (int i = 0; i < recordNum; i++) {
-            records.add(splitIndex + "-" + i);
+            int length = random.nextInt(40) + 10;
+            records.add(splitIndex + "-" + i + "-" + randomAlphanumeric(length));
         }
 
         return records;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index b8c49581f75..be387c583b1 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -31,13 +31,14 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -49,10 +50,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -62,9 +61,7 @@ import java.util.stream.Stream;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
@@ -79,8 +76,10 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
 import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
 import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /**
  * A pulsar cluster operator used for operating pulsar instance. It's serializable for using in
@@ -178,7 +177,7 @@ public class PulsarRuntimeOperator implements Closeable {
      */
     public void createTopic(String topic, int numberOfPartitions) {
         checkArgument(numberOfPartitions >= 0);
-        if (numberOfPartitions <= 0) {
+        if (numberOfPartitions == 0) {
             createNonPartitionedTopic(topic);
         } else {
             createPartitionedTopic(topic, numberOfPartitions);
@@ -196,7 +195,7 @@ public class PulsarRuntimeOperator implements Closeable {
                 sneakyAdmin(() -> admin().topics().getPartitionedTopicMetadata(topic));
         checkArgument(
                 metadata.partitions < newPartitionsNum,
-                "The new partition size which should exceed previous size.");
+                "The new partition size which should greater than previous size.");
 
         sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, newPartitionsNum));
     }
@@ -220,9 +219,11 @@ public class PulsarRuntimeOperator implements Closeable {
             return;
         }
 
+        // Close all the available consumers and producers.
         removeConsumers(topic);
         removeProducers(topic);
-        if (metadata.partitions <= 0) {
+
+        if (metadata.partitions == NON_PARTITIONED) {
             sneakyAdmin(() -> admin().topics().delete(topicName));
         } else {
             sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topicName));
@@ -245,22 +246,6 @@ public class PulsarRuntimeOperator implements Closeable {
         }
     }
 
-    /**
-     * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
-     * mapping for topic and its partitions.
-     */
-    public Map<String, List<TopicPartition>> topicsInfo(String... topics) {
-        return topicsInfo(Arrays.asList(topics));
-    }
-
-    /**
-     * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
-     * mapping for topic and its partitions.
-     */
-    public Map<String, List<TopicPartition>> topicsInfo(Collection<String> topics) {
-        return topics.stream().collect(toMap(identity(), this::topicInfo));
-    }
-
     /**
      * Send a single message to Pulsar, return the message id after the ack from Pulsar.
      *
@@ -518,12 +503,13 @@ public class PulsarRuntimeOperator implements Closeable {
                 topicProducers.computeIfAbsent(
                         index,
                         i -> {
-                            try {
-                                return client().newProducer(schema).topic(topic).create();
-                            } catch (PulsarClientException e) {
-                                sneakyThrow(e);
-                                return null;
-                            }
+                            ProducerBuilder<T> builder =
+                                    client().newProducer(schema)
+                                            .topic(topic)
+                                            .enableBatching(false)
+                                            .enableMultiSchema(true);
+
+                            return sneakyClient(builder::create);
                         });
     }
 
@@ -540,19 +526,15 @@ public class PulsarRuntimeOperator implements Closeable {
                 topicConsumers.computeIfAbsent(
                         index,
                         i -> {
-                            try {
-                                return client().newConsumer(schema)
-                                        .topic(topic)
-                                        .subscriptionName(SUBSCRIPTION_NAME)
-                                        .subscriptionMode(Durable)
-                                        .subscriptionType(Exclusive)
-                                        .subscriptionInitialPosition(
-                                                SubscriptionInitialPosition.Earliest)
-                                        .subscribe();
-                            } catch (PulsarClientException e) {
-                                sneakyThrow(e);
-                                return null;
-                            }
+                            ConsumerBuilder<T> builder =
+                                    client().newConsumer(schema)
+                                            .topic(topic)
+                                            .subscriptionName(SUBSCRIPTION_NAME)
+                                            .subscriptionMode(Durable)
+                                            .subscriptionType(Exclusive)
+                                            .subscriptionInitialPosition(Earliest);
+
+                            return sneakyClient(builder::subscribe);
                         });
     }
 
@@ -561,11 +543,7 @@ public class PulsarRuntimeOperator implements Closeable {
         ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
         if (integerProducers != null) {
             for (Producer<?> producer : integerProducers.values()) {
-                try {
-                    producer.close();
-                } catch (PulsarClientException e) {
-                    sneakyThrow(e);
-                }
+                sneakyClient(producer::close);
             }
         }
     }
@@ -575,11 +553,7 @@ public class PulsarRuntimeOperator implements Closeable {
         ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
         if (integerConsumers != null) {
             for (Consumer<?> consumer : integerConsumers.values()) {
-                try {
-                    consumer.close();
-                } catch (PulsarClientException e) {
-                    sneakyThrow(e);
-                }
+                sneakyClient(consumer::close);
             }
         }
     }