You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/08/12 12:48:42 UTC

[flink] branch master updated: [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 18d21a0618f [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)
18d21a0618f is described below

commit 18d21a0618f2195d4279828f610094ffccd052b3
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Fri Aug 12 20:48:34 2022 +0800

    [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. (#19972)
    
    * Change the initial start cursor and stop cursor to better handle the consuming behaviors.
    * Create the initial subscription instead seek every time. This should fix the wrong position setting.
    * Fix the wrong stop cursor, make sure it stops at the correct space
    * Drop Consumer.seek() for https://github.com/apache/pulsar/pull/16171
---
 .../docs/connectors/datastream/pulsar.md           |  30 ++-
 docs/content/docs/connectors/datastream/pulsar.md  |  37 +++-
 .../generated/pulsar_consumer_configuration.html   |   6 -
 .../connector/pulsar/source/PulsarSource.java      |  14 +-
 .../pulsar/source/PulsarSourceBuilder.java         |   8 +-
 .../pulsar/source/PulsarSourceOptions.java         |   7 +
 .../source/config/PulsarSourceConfigUtils.java     |   3 -
 .../pulsar/source/config/SourceConfiguration.java  |   4 +
 .../source/enumerator/PulsarSourceEnumState.java   |  16 +-
 .../source/enumerator/PulsarSourceEnumerator.java  | 148 ++++++-------
 .../source/enumerator/SplitsAssignmentState.java   | 239 ---------------------
 .../assigner/NonSharedSplitAssigner.java           | 126 +++++++++++
 .../enumerator/assigner/SharedSplitAssigner.java   | 148 +++++++++++++
 .../source/enumerator/assigner/SplitAssigner.java  |  64 ++++++
 .../enumerator/assigner/SplitAssignerFactory.java  |  65 ++++++
 .../source/enumerator/cursor/CursorPosition.java   |  36 ++--
 .../source/enumerator/cursor/MessageIdUtils.java   |  71 ++++++
 .../source/enumerator/cursor/StartCursor.java      |  32 ++-
 .../source/enumerator/cursor/StopCursor.java       |  96 +++++++--
 .../cursor/start/MessageIdStartCursor.java         |  24 +--
 .../cursor/start/TimestampStartCursor.java         |   4 +-
 .../cursor/stop/EventTimestampStopCursor.java      |   9 +-
 .../cursor/stop/LatestMessageStopCursor.java       |  21 +-
 .../cursor/stop/MessageIdStopCursor.java           |  32 +--
 .../enumerator/cursor/stop/NeverStopCursor.java    |   4 +-
 .../cursor/stop/PublishTimestampStopCursor.java    |   9 +-
 .../enumerator/subscriber/PulsarSubscriber.java    |   2 +-
 .../source/reader/PulsarSourceReaderFactory.java   |   2 +-
 .../split/PulsarOrderedPartitionSplitReader.java   |  42 +++-
 .../split/PulsarPartitionSplitReaderBase.java      |  55 ++---
 .../split/PulsarUnorderedPartitionSplitReader.java |   2 +-
 .../pulsar/source/split/PulsarPartitionSplit.java  |   4 +-
 .../pulsar/source/PulsarSourceITCase.java          |  20 --
 .../PulsarSourceEnumStateSerializerTest.java       |   5 +-
 .../enumerator/PulsarSourceEnumeratorTest.java     |  10 +-
 .../enumerator/SplitsAssignmentStateTest.java      | 119 ----------
 .../assigner/NonSharedSplitAssignerTest.java       |  95 ++++++++
 .../assigner/SharedSplitAssignerTest.java          |  98 +++++++++
 .../enumerator/assigner/SplitAssignerTestBase.java | 113 ++++++++++
 .../source/enumerator/cursor/StopCursorTest.java   |  11 +-
 .../split/PulsarPartitionSplitReaderTestBase.java  |   8 +-
 .../pulsar/testutils/PulsarTestContext.java        |  10 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   |  80 +++----
 43 files changed, 1231 insertions(+), 698 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index eace8bb8fc5..7287ee3f3d4 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -322,6 +322,7 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   ```
   {{< /tab >}}
   {{< /tabs >}}
+
 - 与前者不同的是,给定的消息可以跳过,再进行消费。
   {{< tabs "pulsar-starting-position-from-message-id-bool" >}}
   {{< tab "Java" >}}
@@ -335,7 +336,8 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   ```
   {{< /tab >}}
   {{< /tabs >}}
-- 从给定的消息时间开始消费。
+
+- 从给定的消息发布时间开始消费,这个方法因为名称容易导致误解现在已经不建议使用。你可以使用方法 `StartCursor.fromPublishTime(long)`。
   {{< tabs "pulsar-starting-position-message-time" >}}
   {{< tab "Java" >}}
   ```java
@@ -349,6 +351,11 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   {{< /tab >}}
   {{< /tabs >}}
 
+- 从给定的消息发布时间开始消费。
+  ```java
+  StartCursor.fromPublishTime(long);
+  ```
+
 {{< hint info >}}
 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的消息。
 
@@ -404,6 +411,7 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   ```
   {{< /tab >}}
   {{< /tabs >}}
+
 - 停止于某条消息之后,结果里包含此消息。
   {{< tabs "pulsar-boundedness-after-message-id" >}}
   {{< tab "Java" >}}
@@ -417,7 +425,18 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   ```
   {{< /tab >}}
   {{< /tabs >}}
-- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
+
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里不包含此时间戳的消息。
+  ```java
+  StopCursor.atEventTime(long);
+  ```
+
+- 停止于某个给定的消息事件时间戳,比如 `Message<byte[]>.getEventTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterEventTime(long);
+  ```
+
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里不包含此时间戳的消息。
   {{< tabs "pulsar-boundedness-publish-time" >}}
   {{< tab "Java" >}}
   ```java
@@ -431,9 +450,10 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   {{< /tab >}}
   {{< /tabs >}}
 
-  {{< hint warning >}}
-  StopCursor.atEventTime(long) 目前已经处于弃用状态。
-  {{< /hint >}}
+- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`,消费结果里包含此时间戳的消息。
+  ```java
+  StopCursor.afterPublishTime(long);
+  ```
 
 ### Source 配置项
 
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 391fb97a1bf..9311c62328f 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -356,6 +356,7 @@ The Pulsar connector consumes from the latest available message if the message I
   ```
   {{< /tab >}}
   {{< /tabs >}}
+
 - Start from a specified message between the earliest and the latest.
 The Pulsar connector consumes from the latest available message if the message ID doesn't exist.
 
@@ -373,7 +374,10 @@ The Pulsar connector consumes from the latest available message if the message I
   {{< /tab >}}
   {{< /tabs >}}
 
-- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+This method is deprecated because the name is totally wrong which may cause confuse.
+You can use `StartCursor.fromPublishTime(long)` instead.
+
   {{< tabs "pulsar-starting-position-message-time" >}}
   {{< tab "Java" >}}
   ```java
@@ -387,6 +391,11 @@ The Pulsar connector consumes from the latest available message if the message I
   {{< /tab >}}
   {{< /tabs >}}
 
+- Start from the specified message publish time by `Message<byte[]>.getPublishTime()`.
+  ```java
+  StartCursor.fromPublishTime(long);
+  ```
+
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
@@ -420,7 +429,7 @@ Built-in stop cursors include:
   {{< /tab >}}
   {{< /tabs >}}
 
-- Stop at the latest available message when the  Pulsar source starts consuming messages.
+- Stop at the latest available message when the Pulsar source starts consuming messages.
   {{< tabs "pulsar-boundedness-latest" >}}
   {{< tab "Java" >}}
   ```java
@@ -447,6 +456,7 @@ Built-in stop cursors include:
   ```
   {{< /tab >}}
   {{< /tabs >}}
+
 - Stop but include the given message in the consuming result.
   {{< tabs "pulsar-boundedness-after-message-id" >}}
   {{< tab "Java" >}}
@@ -461,7 +471,20 @@ Built-in stop cursors include:
   {{< /tab >}}
   {{< /tabs >}}
 
-- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
+- Stop at the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time won't be included in the consuming result.
+  ```java
+  StopCursor.atEventTime(long);
+  ```
+
+- Stop after the specified event time by `Message<byte[]>.getEventTime()`. The message with the
+given event time will be included in the consuming result.
+  ```java
+  StopCursor.afterEventTime(long);
+  ```
+
+- Stop at the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+given publish time won't be included in the consuming result.
   {{< tabs "pulsar-boundedness-publish-time" >}}
   {{< tab "Java" >}}
   ```java
@@ -475,9 +498,11 @@ Built-in stop cursors include:
   {{< /tab >}}
   {{< /tabs >}}
 
-  {{< hint warning >}}
-  StopCursor.atEventTime(long) is now deprecated.
-  {{< /hint >}}
+- Stop after the specified publish time by `Message<byte[]>.getPublishTime()`. The message with the
+given publish time will be included in the consuming result.
+  ```java
+  StopCursor.afterPublishTime(long);
+  ```
 
 ### Source Configurable Options
 
diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
index bc8b6df4060..4e05b270de4 100644
--- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -140,12 +140,6 @@ C5, 1, 1
             <td>Boolean</td>
             <td>If enabled, the consumer will automatically retry messages.</td>
         </tr>
-        <tr>
-            <td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
-            <td style="word-wrap: break-word;">Latest</td>
-            <td><p>Enum</p></td>
-            <td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
-        </tr>
         <tr>
             <td><h5>pulsar.consumer.subscriptionMode</h5></td>
             <td style="word-wrap: break-word;">Durable</td>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index a6c48d14bc8..6f4775df366 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -32,7 +32,8 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
-import org.apache.flink.connector.pulsar.source.enumerator.SplitsAssignmentState;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
@@ -142,15 +143,14 @@ public final class PulsarSource<OUT>
     @Override
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext) {
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(stopCursor, sourceConfiguration);
+        SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
                 rangeGenerator,
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                splitAssigner);
     }
 
     @Internal
@@ -158,15 +158,15 @@ public final class PulsarSource<OUT>
     public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator(
             SplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             PulsarSourceEnumState checkpoint) {
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(stopCursor, sourceConfiguration, checkpoint);
+        SplitAssigner splitAssigner =
+                SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint);
         return new PulsarSourceEnumerator(
                 subscriber,
                 startCursor,
                 rangeGenerator,
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                splitAssigner);
     }
 
     @Internal
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index ed68215819f..1a5d6ea9758 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -97,7 +97,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
  * discovery always expected new splits to come and not exiting. To disable auto partition
  * discovery, use builder.setConfig({@link
- * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
  *
  * <pre>{@code
  * PulsarSource<String> source = PulsarSource
@@ -266,7 +266,7 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * The consumer name is informative and it can be used to identify a particular consumer
+     * The consumer name is informative, and it can be used to identify a particular consumer
      * instance from the topic stats.
      */
     public PulsarSourceBuilder<OUT> setConsumerName(String consumerName) {
@@ -321,7 +321,7 @@ public final class PulsarSourceBuilder<OUT> {
      * <p>To stop the connector user has to disable the auto partition discovery. As auto partition
      * discovery always expected new splits to come and not exiting. To disable auto partition
      * discovery, use builder.setConfig({@link
-     * PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+     * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
      *
      * @param stopCursor The {@link StopCursor} to specify the stopping offset.
      * @return this PulsarSourceBuilder.
@@ -334,7 +334,7 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
      * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
      * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
      * to specify the stopping offsets for each partition. When all the partitions have reached
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 39a73974f5c..c80ddd3b1fa 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
 import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -503,6 +504,12 @@ public final class PulsarSourceOptions {
                                             code("PulsarClientException"))
                                     .build());
 
+    /**
+     * @deprecated This option would be reset by {@link StartCursor}, no need to use it anymore.
+     *     Pulsar didn't support this config option before 1.10.1, so we have to remove this config
+     *     option.
+     */
+    @Deprecated
     public static final ConfigOption<SubscriptionInitialPosition>
             PULSAR_SUBSCRIPTION_INITIAL_POSITION =
                     ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionInitialPosition")
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 602a1577938..0a4dc31e8d3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -59,7 +59,6 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
@@ -113,8 +112,6 @@ public final class PulsarSourceConfigUtils {
                 builder::consumerName);
         configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
         configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
-        configuration.useOption(
-                PULSAR_SUBSCRIPTION_INITIAL_POSITION, builder::subscriptionInitialPosition);
         createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
         configuration.useOption(
                 PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 24e7ec0c9c0..f957e1a5775 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -83,6 +83,10 @@ public class SourceConfiguration extends PulsarConfiguration {
         return messageQueueCapacity;
     }
 
+    /**
+     * We would override the interval into a negative number when we set the connector with bounded
+     * stop cursor.
+     */
     public boolean isEnablePartitionDiscovery() {
         return getPartitionDiscoveryIntervalMs() > 0;
     }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
index dbab9e21781..56bbbd20a32 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java
@@ -18,15 +18,18 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator;
 
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 /**
  * The state class for pulsar source enumerator, used for storing the split state. This class is
- * managed and controlled by {@link SplitsAssignmentState}.
+ * managed and controlled by {@link SplitAssigner}.
  */
 public class PulsarSourceEnumState {
 
@@ -46,11 +49,12 @@ public class PulsarSourceEnumState {
     private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
 
     /**
-     * A {@link PulsarPartitionSplit} should be assigned for all flink readers. Using this map for
-     * recording assign status.
+     * It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for
+     * all flink readers. Using this map for recording assign status.
      */
     private final Map<Integer, Set<String>> readerAssignedSplits;
 
+    /** The pipeline has been triggered and topic partitions have been assigned to readers. */
     private final boolean initialized;
 
     public PulsarSourceEnumState(
@@ -85,4 +89,10 @@ public class PulsarSourceEnumState {
     public boolean isInitialized() {
         return initialized;
     }
+
+    /** The initial assignment state for Pulsar. */
+    public static PulsarSourceEnumState initialState() {
+        return new PulsarSourceEnumState(
+                new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false);
+    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 7890dcf1847..a64e9272c70 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -22,40 +22,29 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.KeySharedPolicy;
-import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Range;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
 import static java.util.Collections.singletonList;
 import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
-import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
-import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 
 /** The enumerator class for pulsar source. */
 @Internal
@@ -65,13 +54,12 @@ public class PulsarSourceEnumerator
     private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceEnumerator.class);
 
     private final PulsarAdmin pulsarAdmin;
-    private final PulsarClient pulsarClient;
     private final PulsarSubscriber subscriber;
     private final StartCursor startCursor;
     private final RangeGenerator rangeGenerator;
     private final SourceConfiguration sourceConfiguration;
     private final SplitEnumeratorContext<PulsarPartitionSplit> context;
-    private final SplitsAssignmentState assignmentState;
+    private final SplitAssigner splitAssigner;
 
     public PulsarSourceEnumerator(
             PulsarSubscriber subscriber,
@@ -79,15 +67,14 @@ public class PulsarSourceEnumerator
             RangeGenerator rangeGenerator,
             SourceConfiguration sourceConfiguration,
             SplitEnumeratorContext<PulsarPartitionSplit> context,
-            SplitsAssignmentState assignmentState) {
+            SplitAssigner splitAssigner) {
         this.pulsarAdmin = createAdmin(sourceConfiguration);
-        this.pulsarClient = createClient(sourceConfiguration);
         this.subscriber = subscriber;
         this.startCursor = startCursor;
         this.rangeGenerator = rangeGenerator;
         this.sourceConfiguration = sourceConfiguration;
         this.context = context;
-        this.assignmentState = assignmentState;
+        this.splitAssigner = splitAssigner;
     }
 
     @Override
@@ -123,9 +110,9 @@ public class PulsarSourceEnumerator
     @Override
     public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
         // Put the split back to current pending splits.
-        assignmentState.putSplitsBackToPendingList(splits, subtaskId);
+        splitAssigner.addSplitsBack(splits, subtaskId);
 
-        // If the failed subtask has already restarted, we need to assign pending splits to it
+        // If the failed subtask has already restarted, we need to assign pending splits to it.
         if (context.registeredReaders().containsKey(subtaskId)) {
             assignPendingPartitionSplits(singletonList(subtaskId));
         }
@@ -142,7 +129,7 @@ public class PulsarSourceEnumerator
 
     @Override
     public PulsarSourceEnumState snapshotState(long checkpointId) {
-        return assignmentState.snapshotState();
+        return splitAssigner.snapshotState();
     }
 
     @Override
@@ -164,54 +151,7 @@ public class PulsarSourceEnumerator
      */
     private Set<TopicPartition> getSubscribedTopicPartitions() {
         int parallelism = context.currentParallelism();
-        Set<TopicPartition> partitions =
-                subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
-
-        // Seek start position for given partitions.
-        seekStartPosition(partitions);
-
-        return partitions;
-    }
-
-    private void seekStartPosition(Set<TopicPartition> partitions) {
-        ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
-        Set<String> seekedTopics = new HashSet<>();
-
-        for (TopicPartition partition : partitions) {
-            String topicName = partition.getFullTopicName();
-            if (!assignmentState.containsTopic(topicName) && seekedTopics.add(topicName)) {
-                try (Consumer<byte[]> consumer =
-                        sneakyClient(() -> consumerBuilder.clone().topic(topicName).subscribe())) {
-                    startCursor.seekPosition(
-                            partition.getTopic(), partition.getPartitionId(), consumer);
-                } catch (PulsarClientException e) {
-                    if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
-                        throw new IllegalArgumentException(e);
-                    } else {
-                        // WARN_ON_MISMATCH would just print this warning message.
-                        // No need to print the stacktrace.
-                        LOG.warn(
-                                "Failed to set initial consuming position for partition {}",
-                                partition,
-                                e);
-                    }
-                }
-            }
-        }
-    }
-
-    private ConsumerBuilder<byte[]> consumerBuilder() {
-        ConsumerBuilder<byte[]> builder =
-                createConsumerBuilder(pulsarClient, Schema.BYTES, sourceConfiguration);
-        if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
-            Range range = TopicRange.createFullRange().toPulsarRange();
-            KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
-            // Force this consume use sticky hash range in Key_Shared subscription.
-            // Pulsar won't remove old message dispatcher before 2.8.2 release.
-            builder.keySharedPolicy(keySharedPolicy);
-        }
-
-        return builder;
+        return subscriber.getSubscribedTopicPartitions(pulsarAdmin, rangeGenerator, parallelism);
     }
 
     /**
@@ -230,13 +170,55 @@ public class PulsarSourceEnumerator
         }
 
         // Append the partitions into current assignment state.
-        assignmentState.appendTopicPartitions(fetchedPartitions);
-        List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
+        List<TopicPartition> newPartitions =
+                splitAssigner.registerTopicPartitions(fetchedPartitions);
+        createSubscription(newPartitions);
 
         // Assign the new readers.
+        List<Integer> registeredReaders = new ArrayList<>(context.registeredReaders().keySet());
         assignPendingPartitionSplits(registeredReaders);
     }
 
+    /** Create subscription on topic partition if it doesn't exist. */
+    private void createSubscription(List<TopicPartition> newPartitions) {
+        for (TopicPartition partition : newPartitions) {
+            String topicName = partition.getFullTopicName();
+            String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+            List<String> subscriptions =
+                    sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
+            if (!subscriptions.contains(subscriptionName)) {
+                CursorPosition position =
+                        startCursor.position(partition.getTopic(), partition.getPartitionId());
+                MessageId initialPosition = queryInitialPosition(topicName, position);
+
+                sneakyAdmin(
+                        () ->
+                                pulsarAdmin
+                                        .topics()
+                                        .createSubscription(
+                                                topicName, subscriptionName, initialPosition));
+            }
+        }
+    }
+
+    /** Query the available message id from Pulsar. */
+    private MessageId queryInitialPosition(String topicName, CursorPosition position) {
+        CursorPosition.Type type = position.getType();
+        if (type == CursorPosition.Type.TIMESTAMP) {
+            return sneakyAdmin(
+                    () ->
+                            pulsarAdmin
+                                    .topics()
+                                    .getMessageIdByTimestamp(topicName, position.getTimestamp()));
+        } else if (type == CursorPosition.Type.MESSAGE_ID) {
+            return position.getMessageId();
+        } else {
+            throw new UnsupportedOperationException("We don't support this seek type " + type);
+        }
+    }
+
+    /** Query the unassigned splits and assign them to the available readers. */
     private void assignPendingPartitionSplits(List<Integer> pendingReaders) {
         // Validate the reader.
         pendingReaders.forEach(
@@ -248,17 +230,19 @@ public class PulsarSourceEnumerator
                 });
 
         // Assign splits to downstream readers.
-        assignmentState.assignSplits(pendingReaders).ifPresent(context::assignSplits);
+        splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits);
 
         // If periodically partition discovery is disabled and the initializing discovery has done,
-        // signal NoMoreSplitsEvent to pending readers
-        if (assignmentState.noMoreNewPartitionSplits()) {
-            LOG.debug(
-                    "No more PulsarPartitionSplits to assign."
-                            + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
-                    pendingReaders,
-                    sourceConfiguration.getSubscriptionDesc());
-            pendingReaders.forEach(this.context::signalNoMoreSplits);
+        // signal NoMoreSplitsEvent to pending readers.
+        for (Integer reader : pendingReaders) {
+            if (splitAssigner.noMoreSplits(reader)) {
+                LOG.debug(
+                        "No more PulsarPartitionSplits to assign."
+                                + " Sending NoMoreSplitsEvent to reader {} in subscription {}.",
+                        reader,
+                        sourceConfiguration.getSubscriptionDesc());
+                context.signalNoMoreSplits(reader);
+            }
         }
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
deleted file mode 100644
index cbc4826583a..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentState.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-
-/** The state class for recording the split assignment. */
-@Internal
-public class SplitsAssignmentState {
-
-    private final StopCursor stopCursor;
-    private final SourceConfiguration sourceConfiguration;
-
-    // The dynamic states for checkpoint.
-    private final Set<TopicPartition> appendedPartitions;
-    // This pending splits is used for Key_Shared, Failover, Exclusive subscription.
-    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
-    // These two fields are used for Shared subscription.
-    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
-    private final Map<Integer, Set<String>> readerAssignedSplits;
-    private boolean initialized;
-
-    public SplitsAssignmentState(StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
-        this.stopCursor = stopCursor;
-        this.sourceConfiguration = sourceConfiguration;
-        this.appendedPartitions = new HashSet<>();
-        this.pendingPartitionSplits = new HashSet<>();
-        this.sharedPendingPartitionSplits = new HashMap<>();
-        this.readerAssignedSplits = new HashMap<>();
-        this.initialized = false;
-    }
-
-    public SplitsAssignmentState(
-            StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            PulsarSourceEnumState sourceEnumState) {
-        this.stopCursor = stopCursor;
-        this.sourceConfiguration = sourceConfiguration;
-        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
-        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
-        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
-        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
-        this.initialized = sourceEnumState.isInitialized();
-    }
-
-    public PulsarSourceEnumState snapshotState() {
-        return new PulsarSourceEnumState(
-                appendedPartitions,
-                pendingPartitionSplits,
-                sharedPendingPartitionSplits,
-                readerAssignedSplits,
-                initialized);
-    }
-
-    /**
-     * Append the new fetched partitions to current state. We would generate pending source split
-     * for downstream pulsar readers. Since the {@link SplitEnumeratorContext} don't support put the
-     * split back to enumerator, we don't support partition deletion.
-     *
-     * @param fetchedPartitions The partitions from the {@link PulsarSubscriber}.
-     */
-    public void appendTopicPartitions(Set<TopicPartition> fetchedPartitions) {
-        for (TopicPartition partition : fetchedPartitions) {
-            // If this partition is a new partition.
-            if (!appendedPartitions.contains(partition)) {
-                if (!sharePartition()) {
-                    // Create a split and add it to pending list.
-                    pendingPartitionSplits.add(createSplit(partition));
-                }
-
-                // Shared subscription don't create splits, we just register partitions.
-                appendedPartitions.add(partition);
-            }
-        }
-
-        // Update this initialize flag.
-        if (!initialized) {
-            this.initialized = true;
-        }
-    }
-
-    public boolean containsTopic(String topicName) {
-        return appendedPartitions.stream()
-                .anyMatch(partition -> Objects.equals(partition.getFullTopicName(), topicName));
-    }
-
-    /** Put these splits back to pending list. */
-    public void putSplitsBackToPendingList(List<PulsarPartitionSplit> splits, int readerId) {
-        if (!sharePartition()) {
-            // Put these splits back to normal pending list.
-            pendingPartitionSplits.addAll(splits);
-        } else {
-            // Put the splits back to shared pending list.
-            Set<PulsarPartitionSplit> pending =
-                    sharedPendingPartitionSplits.computeIfAbsent(readerId, id -> new HashSet<>());
-            pending.addAll(splits);
-        }
-    }
-
-    public Optional<SplitsAssignment<PulsarPartitionSplit>> assignSplits(
-            List<Integer> pendingReaders) {
-        // Avoid empty readers assign.
-        if (pendingReaders.isEmpty()) {
-            return Optional.empty();
-        }
-
-        Map<Integer, List<PulsarPartitionSplit>> assignMap;
-
-        // We extract the assign logic into two method for better readability.
-        if (!sharePartition()) {
-            assignMap = assignNormalSplits(pendingReaders);
-        } else {
-            assignMap = assignSharedSplits(pendingReaders);
-        }
-
-        if (assignMap.isEmpty()) {
-            return Optional.empty();
-        } else {
-            return Optional.of(new SplitsAssignment<>(assignMap));
-        }
-    }
-
-    /**
-     * @return It would return true only if periodically partition discovery is disabled, the
-     *     initializing partition discovery has finished AND there is no pending splits for
-     *     assignment.
-     */
-    public boolean noMoreNewPartitionSplits() {
-        return !sourceConfiguration.isEnablePartitionDiscovery()
-                && initialized
-                && pendingPartitionSplits.isEmpty();
-    }
-
-    // ----------------- private methods -------------------
-
-    /** The splits don't shared for all the readers. */
-    private Map<Integer, List<PulsarPartitionSplit>> assignNormalSplits(
-            List<Integer> pendingReaders) {
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
-        // Drain a list of splits.
-        List<PulsarPartitionSplit> pendingSplits = drainPendingPartitionsSplits();
-        for (int i = 0; i < pendingSplits.size(); i++) {
-            PulsarPartitionSplit split = pendingSplits.get(i);
-            int readerId = pendingReaders.get(i % pendingReaders.size());
-            assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
-        }
-
-        return assignMap;
-    }
-
-    /** Every split would be shared among available readers. */
-    private Map<Integer, List<PulsarPartitionSplit>> assignSharedSplits(
-            List<Integer> pendingReaders) {
-        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
-
-        // Drain the splits from share pending list.
-        for (Integer reader : pendingReaders) {
-            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
-            if (pendingSplits == null) {
-                pendingSplits = new HashSet<>();
-            }
-
-            Set<String> assignedSplits =
-                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
-
-            for (TopicPartition partition : appendedPartitions) {
-                String partitionName = partition.toString();
-                if (!assignedSplits.contains(partitionName)) {
-                    pendingSplits.add(createSplit(partition));
-                    assignedSplits.add(partitionName);
-                }
-            }
-
-            if (!pendingSplits.isEmpty()) {
-                assignMap.put(reader, new ArrayList<>(pendingSplits));
-            }
-        }
-
-        return assignMap;
-    }
-
-    private PulsarPartitionSplit createSplit(TopicPartition partition) {
-        try {
-            StopCursor stop = InstantiationUtil.clone(stopCursor);
-            return new PulsarPartitionSplit(partition, stop);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    private List<PulsarPartitionSplit> drainPendingPartitionsSplits() {
-        List<PulsarPartitionSplit> splits = new ArrayList<>(pendingPartitionSplits);
-        pendingPartitionSplits.clear();
-
-        return splits;
-    }
-
-    /** {@link SubscriptionType#Shared} mode should share a same split for all the readers. */
-    private boolean sharePartition() {
-        return sourceConfiguration.getSubscriptionType() == SubscriptionType.Shared;
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
new file mode 100644
index 00000000000..087e96157d6
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
+ * and {@link SubscriptionType#Key_Shared} subscriptions.
+ */
+@Internal
+public class NonSharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8412586087991597092L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Set<PulsarPartitionSplit> pendingPartitionSplits;
+    private boolean initialized;
+
+    public NonSharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                pendingPartitionSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+
+        List<PulsarPartitionSplit> partitionSplits = new ArrayList<>(pendingPartitionSplits);
+        int readerCount = readers.size();
+        for (int i = 0; i < partitionSplits.size(); i++) {
+            int index = i % readerCount;
+            Integer readerId = readers.get(index);
+            PulsarPartitionSplit split = partitionSplits.get(i);
+            assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split);
+        }
+        pendingPartitionSplits.clear();
+
+        return Optional.of(new SplitsAssignment<>(assignMap));
+    }
+
+    @Override
+    public boolean noMoreSplits(Integer reader) {
+        return !enablePartitionDiscovery && initialized && pendingPartitionSplits.isEmpty();
+    }
+
+    @Override
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions,
+                pendingPartitionSplits,
+                new HashMap<>(),
+                new HashMap<>(),
+                initialized);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
new file mode 100644
index 00000000000..48d75c8dee3
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */
+@Internal
+public class SharedSplitAssigner implements SplitAssigner {
+    private static final long serialVersionUID = 8468503133499402491L;
+
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+
+    // These fields would be saved into checkpoint.
+
+    private final Set<TopicPartition> appendedPartitions;
+    private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits;
+    private final Map<Integer, Set<String>> readerAssignedSplits;
+    private boolean initialized;
+
+    public SharedSplitAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        this.stopCursor = stopCursor;
+        this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        this.appendedPartitions = sourceEnumState.getAppendedPartitions();
+        this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits();
+        this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits();
+        this.initialized = sourceEnumState.isInitialized();
+    }
+
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        Set<PulsarPartitionSplit> pendingPartitionSplits =
+                sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>());
+        pendingPartitionSplits.addAll(splits);
+    }
+
+    @Override
+    public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
+            List<Integer> readers) {
+        if (readers.isEmpty()) {
+            return Optional.empty();
+        }
+
+        Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>();
+        for (Integer reader : readers) {
+            Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader);
+            if (pendingSplits == null) {
+                pendingSplits = new HashSet<>();
+            }
+
+            Set<String> assignedSplits =
+                    readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>());
+
+            for (TopicPartition partition : appendedPartitions) {
+                String partitionName = partition.toString();
+                if (!assignedSplits.contains(partitionName)) {
+                    pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor));
+                    assignedSplits.add(partitionName);
+                }
+            }
+
+            if (!pendingSplits.isEmpty()) {
+                assignMap.put(reader, new ArrayList<>(pendingSplits));
+            }
+        }
+
+        if (assignMap.isEmpty()) {
+            return Optional.empty();
+        } else {
+            return Optional.of(new SplitsAssignment<>(assignMap));
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits(Integer reader) {
+        Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.get(reader);
+        Set<String> assignedSplits = readerAssignedSplits.get(reader);
+
+        return !enablePartitionDiscovery
+                && initialized
+                && (pendingSplits == null || pendingSplits.isEmpty())
+                && (assignedSplits != null && assignedSplits.size() == appendedPartitions.size());
+    }
+
+    @Override
+    public PulsarSourceEnumState snapshotState() {
+        return new PulsarSourceEnumState(
+                appendedPartitions,
+                new HashSet<>(),
+                sharedPendingPartitionSplits,
+                readerAssignedSplits,
+                initialized);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
new file mode 100644
index 00000000000..bc03f5103fd
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * The split assigner for different subscription. We would spread all the splits to different
+ * readers and store all the state into checkpoint.
+ */
+@Internal
+public interface SplitAssigner extends Serializable {
+
+    /**
+     * Add the current available partitions into assigner.
+     *
+     * @param fetchedPartitions The available partitions queried from Pulsar broker.
+     * @return New topic partitions compare to previous registered partitions.
+     */
+    List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions);
+
+    /**
+     * Add a split back to the split assigner if the reader fails. We would try to reassign the
+     * split or add it to the pending list.
+     */
+    void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId);
+
+    /** Create a split assignment from the current readers. */
+    Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers);
+
+    /**
+     * It would return true only if periodically partition discovery is disabled, the initializing
+     * partition discovery has finished AND there is no pending splits for assignment.
+     */
+    boolean noMoreSplits(Integer reader);
+
+    /** Snapshot the current assign state into checkpoint. */
+    PulsarSourceEnumState snapshotState();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
new file mode 100644
index 00000000000..3e6ebccb49b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.client.api.SubscriptionType.Failover;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
+
+/** The factory for creating split assigner. */
+@Internal
+public final class SplitAssignerFactory {
+
+    private SplitAssignerFactory() {
+        // No public constructor.
+    }
+
+    /** Create blank assigner. */
+    public static SplitAssigner create(
+            StopCursor stopCursor, SourceConfiguration sourceConfiguration) {
+        return create(stopCursor, sourceConfiguration, initialState());
+    }
+
+    /** Create assigner from checkpoint state. */
+    public static SplitAssigner create(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+        if (subscriptionType == Exclusive
+                || subscriptionType == Failover
+                || subscriptionType == Key_Shared) {
+            return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else if (subscriptionType == Shared) {
+            return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+        } else {
+            throw new IllegalArgumentException(
+                    "We don't support this subscription type: " + subscriptionType);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
index a2aaff62906..c965ff962f8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java
@@ -18,18 +18,16 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.cursor;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
 
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
-/** The class for defining the start or stop position. */
+/**
+ * The class for defining the start or stop position. We only expose the constructor for end user.
+ */
 @PublicEvolving
 public final class CursorPosition implements Serializable {
     private static final long serialVersionUID = -802405183307684549L;
@@ -40,34 +38,31 @@ public final class CursorPosition implements Serializable {
 
     private final Long timestamp;
 
-    public CursorPosition(@Nullable MessageId messageId) {
+    public CursorPosition(MessageId messageId) {
         this.type = Type.MESSAGE_ID;
         this.messageId = messageId;
         this.timestamp = null;
     }
 
-    public CursorPosition(@Nullable Long timestamp) {
+    public CursorPosition(Long timestamp) {
         this.type = Type.TIMESTAMP;
         this.messageId = null;
         this.timestamp = timestamp;
     }
 
-    @VisibleForTesting
+    @Internal
+    public Type getType() {
+        return type;
+    }
+
+    @Internal
     public MessageId getMessageId() {
         return messageId;
     }
 
-    /** Pulsar consumer could be subscribed by the position. */
-    public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
-        if (type == Type.MESSAGE_ID) {
-            consumer.seek(messageId);
-        } else {
-            if (timestamp != null) {
-                consumer.seek(timestamp);
-            } else {
-                consumer.seek(System.currentTimeMillis());
-            }
-        }
+    @Internal
+    public Long getTimestamp() {
+        return timestamp;
     }
 
     @Override
@@ -82,6 +77,7 @@ public final class CursorPosition implements Serializable {
     /**
      * The position type for reader to choose whether timestamp or message id as the start position.
      */
+    @Internal
     public enum Type {
         TIMESTAMP,
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
new file mode 100644
index 00000000000..a8c3a6b2ef2
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.cursor;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The helper class for Pulsar's message id. */
+@Internal
+public final class MessageIdUtils {
+
+    private MessageIdUtils() {
+        // No public constructor.
+    }
+
+    /**
+     * The implementation from <a
+     * href="https://github.com/apache/pulsar/blob/7c8dc3201baad7d02d886dbc26db5c03abce77d6/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java#L85">this
+     * code snippet</a> to get next message id.
+     */
+    public static MessageId nextMessageId(MessageId messageId) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+
+        if (idImpl.getEntryId() < 0) {
+            return newMessageId(idImpl.getLedgerId(), 0, idImpl.getPartitionIndex());
+        } else {
+            return newMessageId(
+                    idImpl.getLedgerId(), idImpl.getEntryId() + 1, idImpl.getPartitionIndex());
+        }
+    }
+
+    /**
+     * Convert the message id interface to its backend implementation. And check if it's a batch
+     * message id. We don't support the batch message for its low performance now.
+     */
+    public static MessageIdImpl unwrapMessageId(MessageId messageId) {
+        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        if (idImpl instanceof BatchMessageIdImpl) {
+            int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
+            checkArgument(batchSize == 1, "We only support normal message id currently.");
+        }
+
+        return idImpl;
+    }
+
+    /** Hide the message id implementation. */
+    public static MessageId newMessageId(long ledgerId, long entryId, int partitionIndex) {
+        return new MessageIdImpl(ledgerId, entryId, partitionIndex);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
index af35319a5a7..9c1d699a269 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StartCursor.java
@@ -22,9 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.MessageIdStartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.start.TimestampStartCursor;
 
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.io.Serializable;
@@ -43,13 +41,6 @@ public interface StartCursor extends Serializable {
 
     CursorPosition position(String topic, int partitionId);
 
-    /** Helper method for seek the right position for given pulsar consumer. */
-    default void seekPosition(String topic, int partitionId, Consumer<?> consumer)
-            throws PulsarClientException {
-        CursorPosition position = position(topic, partitionId);
-        position.seekPosition(consumer);
-    }
-
     // --------------------------- Static Factory Methods -----------------------------
 
     static StartCursor defaultStartCursor() {
@@ -64,19 +55,38 @@ public interface StartCursor extends Serializable {
         return fromMessageId(MessageId.latest);
     }
 
+    /**
+     * Find the available message id and start consuming from it. The given message is included in
+     * the consuming result by default if you provide a specified message id instead of {@link
+     * MessageId#earliest} or {@link MessageId#latest}.
+     */
     static StartCursor fromMessageId(MessageId messageId) {
         return fromMessageId(messageId, true);
     }
 
     /**
      * @param messageId Find the available message id and start consuming from it.
-     * @param inclusive {@code true} would include the given message id.
+     * @param inclusive {@code true} would include the given message id if it's not the {@link
+     *     MessageId#earliest} or {@link MessageId#latest}.
      */
     static StartCursor fromMessageId(MessageId messageId, boolean inclusive) {
         return new MessageIdStartCursor(messageId, inclusive);
     }
 
+    /**
+     * This method is designed for seeking message from event time. But Pulsar didn't support
+     * seeking from message time, instead, it would seek the position from publish time. We only
+     * keep this method for backward compatible.
+     *
+     * @deprecated Use {@link #fromPublishTime(long)} instead.
+     */
+    @Deprecated
     static StartCursor fromMessageTime(long timestamp) {
-        return new TimestampStartCursor(timestamp);
+        return new TimestampStartCursor(timestamp, true);
+    }
+
+    /** Seek the start position by using message publish time. */
+    static StartCursor fromPublishTime(long timestamp) {
+        return new TimestampStartCursor(timestamp, true);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
index 0bf46ce1282..d44c78fcf1a 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursor.java
@@ -44,11 +44,55 @@ public interface StopCursor extends Serializable {
     /** The open method for the cursor initializer. This method could be executed multiple times. */
     default void open(PulsarAdmin admin, TopicPartition partition) {}
 
-    /**
-     * Determine whether to pause consumption on the current message by the returned boolean value.
-     * The message presented in method argument wouldn't be consumed if the return result is true.
-     */
-    boolean shouldStop(Message<?> message);
+    /** Determine whether to pause consumption on the current message by the returned enum. */
+    StopCondition shouldStop(Message<?> message);
+
+    /** The conditional for control the stop behavior of the pulsar source. */
+    @PublicEvolving
+    enum StopCondition {
+
+        /** This message should be included in the result. */
+        CONTINUE,
+        /** This message should be included in the result and stop consuming. */
+        EXACTLY,
+        /** Stop consuming, the given message wouldn't be included in the result. */
+        TERMINATE;
+
+        /**
+         * Common methods for comparing the message id.
+         *
+         * @param desired The stop goal of the message id.
+         * @param current The upcoming message id.
+         * @param inclusive Should the desired message be included in the consuming result.
+         */
+        public static StopCondition compare(
+                MessageId desired, MessageId current, boolean inclusive) {
+            if (current.compareTo(desired) < 0) {
+                return StopCondition.CONTINUE;
+            } else if (current.compareTo(desired) == 0) {
+                return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+            } else {
+                return StopCondition.TERMINATE;
+            }
+        }
+
+        /**
+         * Common methods for comparing the message time.
+         *
+         * @param desired The stop goal of the message time.
+         * @param current The upcoming message time.
+         * @param inclusive Should the desired message be included in the consuming result.
+         */
+        public static StopCondition compare(long desired, long current, boolean inclusive) {
+            if (current < desired) {
+                return StopCondition.CONTINUE;
+            } else if (current == desired) {
+                return inclusive ? StopCondition.EXACTLY : StopCondition.TERMINATE;
+            } else {
+                return StopCondition.TERMINATE;
+            }
+        }
+    }
 
     // --------------------------- Static Factory Methods -----------------------------
 
@@ -61,32 +105,52 @@ public interface StopCursor extends Serializable {
     }
 
     static StopCursor latest() {
-        return new LatestMessageStopCursor();
+        return new LatestMessageStopCursor(true);
     }
 
     /**
-     * Stop when the messageId is equal or greater than the specified messageId. Message that is
-     * equal to the specified messageId will not be consumed.
+     * Stop consuming when the messageId is equal or greater than the specified messageId. Message
+     * that is equal to the specified messageId will not be consumed.
      */
     static StopCursor atMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(false);
+        } else {
+            return new MessageIdStopCursor(messageId, false);
+        }
     }
 
     /**
-     * Stop when the messageId is greater than the specified messageId. Message that is equal to the
-     * specified messageId will be consumed.
+     * Stop consuming when the messageId is greater than the specified messageId. Message that is
+     * equal to the specified messageId will be consumed.
      */
     static StopCursor afterMessageId(MessageId messageId) {
-        return new MessageIdStopCursor(messageId, false);
+        if (MessageId.latest.equals(messageId)) {
+            return new LatestMessageStopCursor(true);
+        } else {
+            return new MessageIdStopCursor(messageId, true);
+        }
     }
 
-    @Deprecated
+    /** Stop consuming when message eventTime is greater than or equals the specified timestamp. */
     static StopCursor atEventTime(long timestamp) {
-        return new EventTimestampStopCursor(timestamp);
+        return new EventTimestampStopCursor(timestamp, false);
     }
 
-    /** Stop when message publishTime is greater than the specified timestamp. */
+    /** Stop consuming when message eventTime is greater than the specified timestamp. */
+    static StopCursor afterEventTime(long timestamp) {
+        return new EventTimestampStopCursor(timestamp, true);
+    }
+
+    /**
+     * Stop consuming when message publishTime is greater than or equals the specified timestamp.
+     */
     static StopCursor atPublishTime(long timestamp) {
-        return new PublishTimestampStopCursor(timestamp);
+        return new PublishTimestampStopCursor(timestamp, false);
+    }
+
+    /** Stop consuming when message publishTime is greater than the specified timestamp. */
+    static StopCursor afterPublishTime(long timestamp) {
+        return new PublishTimestampStopCursor(timestamp, true);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
index 71a4eb6a026..03d5ba3e133 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -25,7 +25,8 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
 
 /** This cursor would leave pulsar start consuming from a specific message id. */
 public class MessageIdStartCursor implements StartCursor {
@@ -43,23 +44,16 @@ public class MessageIdStartCursor implements StartCursor {
      * code</a> for understanding pulsar internal logic.
      *
      * @param messageId The message id for start position.
-     * @param inclusive Should we include the start message id in consuming result.
+     * @param inclusive Whether we include the start message id in consuming result. This works only
+     *     if we provide a specified message id instead of {@link MessageId#earliest} or {@link
+     *     MessageId#latest}.
      */
     public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
-        if (inclusive) {
-            this.messageId = messageId;
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        if (MessageId.earliest.equals(idImpl) || MessageId.latest.equals(idImpl) || inclusive) {
+            this.messageId = idImpl;
         } else {
-            checkState(
-                    messageId instanceof MessageIdImpl,
-                    "We only support normal message id and batch message id.");
-            MessageIdImpl id = (MessageIdImpl) messageId;
-            if (MessageId.earliest.equals(messageId) || MessageId.latest.equals(messageId)) {
-                this.messageId = messageId;
-            } else {
-                this.messageId =
-                        new MessageIdImpl(
-                                id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
-            }
+            this.messageId = nextMessageId(idImpl);
         }
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
index eb4ea32ebb6..da51a58e943 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -27,8 +27,8 @@ public class TimestampStartCursor implements StartCursor {
 
     private final long timestamp;
 
-    public TimestampStartCursor(long timestamp) {
-        this.timestamp = timestamp;
+    public TimestampStartCursor(long timestamp, boolean inclusive) {
+        this.timestamp = inclusive ? timestamp : timestamp + 1;
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
index e425545de44..d2a44ea362d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/EventTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class EventTimestampStopCursor implements StopCursor {
     private static final long serialVersionUID = 2391576769339369027L;
 
     private final long timestamp;
+    private final boolean inclusive;
 
-    public EventTimestampStopCursor(long timestamp) {
+    public EventTimestampStopCursor(long timestamp, boolean inclusive) {
         this.timestamp = timestamp;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return message.getEventTime() >= timestamp;
+    public StopCondition shouldStop(Message<?> message) {
+        long eventTime = message.getEventTime();
+        return StopCondition.compare(timestamp, eventTime, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index 257081f5c89..0de963ed4fb 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -30,24 +30,29 @@ import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtil
 /**
  * A stop cursor that initialize the position to the latest message id. The offsets initialization
  * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
- * PulsarSourceEnumerator}.
+ * PulsarSourceEnumerator}. We would include the latest message available in Pulsar by default.
  */
 public class LatestMessageStopCursor implements StopCursor {
     private static final long serialVersionUID = 1702059838323965723L;
 
     private MessageId messageId;
+    private final boolean inclusive;
+
+    public LatestMessageStopCursor(boolean inclusive) {
+        this.inclusive = inclusive;
+    }
+
+    @Override
+    public StopCondition shouldStop(Message<?> message) {
+        MessageId current = message.getMessageId();
+        return StopCondition.compare(messageId, current, inclusive);
+    }
 
     @Override
     public void open(PulsarAdmin admin, TopicPartition partition) {
         if (messageId == null) {
             String topic = partition.getFullTopicName();
-            messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
+            this.messageId = sneakyAdmin(() -> admin.topics().getLastMessageId(topic));
         }
     }
-
-    @Override
-    public boolean shouldStop(Message<?> message) {
-        MessageId id = message.getMessageId();
-        return id.compareTo(messageId) >= 0;
-    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
index 7af55a00cc0..03d83aa4495 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
@@ -22,6 +22,12 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.unwrapMessageId;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.MessageId.earliest;
+import static org.apache.pulsar.client.api.MessageId.latest;
 
 /**
  * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
@@ -32,24 +38,22 @@ public class MessageIdStopCursor implements StopCursor {
 
     private final MessageId messageId;
 
-    private final boolean exclusive;
+    private final boolean inclusive;
 
-    public MessageIdStopCursor(MessageId messageId) {
-        this(messageId, true);
-    }
+    public MessageIdStopCursor(MessageId messageId, boolean inclusive) {
+        MessageIdImpl idImpl = unwrapMessageId(messageId);
+        checkArgument(!earliest.equals(idImpl), "MessageId.earliest is not supported.");
+        checkArgument(
+                !latest.equals(idImpl),
+                "MessageId.latest is not supported, use LatestMessageStopCursor instead.");
 
-    public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
-        this.messageId = messageId;
-        this.exclusive = exclusive;
+        this.messageId = idImpl;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        MessageId id = message.getMessageId();
-        if (exclusive) {
-            return id.compareTo(messageId) > 0;
-        } else {
-            return id.compareTo(messageId) >= 0;
-        }
+    public StopCondition shouldStop(Message<?> message) {
+        MessageId current = message.getMessageId();
+        return StopCondition.compare(messageId, current, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
index ff2c619afb8..3eb035634ae 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
@@ -27,7 +27,7 @@ public class NeverStopCursor implements StopCursor {
     private static final long serialVersionUID = -3113601090292771786L;
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return false;
+    public StopCondition shouldStop(Message<?> message) {
+        return StopCondition.CONTINUE;
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
index b598e7addd4..2dfdd765842 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/stop/PublishTimestampStopCursor.java
@@ -27,13 +27,16 @@ public class PublishTimestampStopCursor implements StopCursor {
     private static final long serialVersionUID = 4386276745339324527L;
 
     private final long timestamp;
+    private final boolean inclusive;
 
-    public PublishTimestampStopCursor(long timestamp) {
+    public PublishTimestampStopCursor(long timestamp, boolean inclusive) {
         this.timestamp = timestamp;
+        this.inclusive = inclusive;
     }
 
     @Override
-    public boolean shouldStop(Message<?> message) {
-        return message.getPublishTime() >= timestamp;
+    public StopCondition shouldStop(Message<?> message) {
+        long publishTime = message.getPublishTime();
+        return StopCondition.compare(timestamp, publishTime, inclusive);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
index 08ba1faa442..b8a55bf8a34 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriber.java
@@ -48,7 +48,7 @@ public interface PulsarSubscriber extends Serializable {
 
     /**
      * Get a set of subscribed {@link TopicPartition}s. The method could throw {@link
-     * IllegalStateException}, a extra try catch is required.
+     * IllegalStateException}, an extra try catch is required.
      *
      * @param pulsarAdmin The admin interface used to retrieve subscribed topic partitions.
      * @param rangeGenerator The range for different partitions.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
index 6a5d5152231..820888a7bf3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
@@ -77,7 +77,7 @@ public final class PulsarSourceReaderFactory {
         SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
         if (subscriptionType == SubscriptionType.Failover
                 || subscriptionType == SubscriptionType.Exclusive) {
-            // Create a ordered split reader supplier.
+            // Create an ordered split reader supplier.
             Supplier<PulsarOrderedPartitionSplitReader<OUT>> splitReaderSupplier =
                     () ->
                             new PulsarOrderedPartitionSplitReader<>(
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
index bb6d79641f5..316431aa75e 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -20,14 +20,13 @@ package org.apache.flink.connector.pulsar.source.reader.split;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -36,10 +35,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
 
 /**
  * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
@@ -75,18 +76,39 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
     }
 
     @Override
-    protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
         MessageId latestConsumedId = split.getLatestConsumedId();
 
         // Reset the start position for ordered pulsar consumer.
         if (latestConsumedId != null) {
-            StartCursor startCursor = StartCursor.fromMessageId(latestConsumedId, false);
-            TopicPartition partition = split.getPartition();
-
+            LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
             try {
-                startCursor.seekPosition(
-                        partition.getTopic(), partition.getPartitionId(), consumer);
-            } catch (PulsarClientException e) {
+                MessageId initialPosition;
+                if (latestConsumedId == MessageId.latest
+                        || latestConsumedId == MessageId.earliest) {
+                    // for compatibility
+                    initialPosition = latestConsumedId;
+                } else {
+                    initialPosition = nextMessageId(latestConsumedId);
+                }
+
+                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
+                // See https://github.com/apache/pulsar/issues/16757 for more details.
+
+                String topicName = split.getPartition().getFullTopicName();
+                List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+                String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+                if (!subscriptions.contains(subscriptionName)) {
+                    // If this subscription is not available. Just create it.
+                    pulsarAdmin
+                            .topics()
+                            .createSubscription(topicName, subscriptionName, initialPosition);
+                } else {
+                    // Reset the subscription if this is existed.
+                    pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
+                }
+            } catch (PulsarAdminException e) {
                 if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
                     throw new IllegalArgumentException(e);
                 } else {
@@ -95,7 +117,7 @@ public class PulsarOrderedPartitionSplitReader<OUT> extends PulsarPartitionSplit
                     LOG.warn(
                             "Failed to reset cursor to {} on partition {}",
                             latestConsumedId,
-                            partition,
+                            split.getPartition(),
                             e);
                 }
             }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
index 37b5630a8d1..2d4214262c8 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.StopCondition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -52,7 +53,6 @@ import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
 import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
@@ -70,7 +70,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
     protected final PulsarAdmin pulsarAdmin;
     protected final SourceConfiguration sourceConfiguration;
     protected final PulsarDeserializationSchema<OUT> deserializationSchema;
-    protected final AtomicBoolean wakeup;
 
     protected Consumer<byte[]> pulsarConsumer;
     protected PulsarPartitionSplit registeredSplit;
@@ -84,7 +83,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
         this.pulsarAdmin = pulsarAdmin;
         this.sourceConfiguration = sourceConfiguration;
         this.deserializationSchema = deserializationSchema;
-        this.wakeup = new AtomicBoolean(false);
     }
 
     @Override
@@ -96,9 +94,6 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
             return builder.build();
         }
 
-        // Set wakeup to false for start consuming.
-        wakeup.compareAndSet(true, false);
-
         StopCursor stopCursor = registeredSplit.getStopCursor();
         String splitId = registeredSplit.splitId();
         PulsarMessageCollector<OUT> collector = new PulsarMessageCollector<>(splitId, builder);
@@ -106,9 +101,7 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
         // Consume message from pulsar until it was woke up by flink reader.
         for (int messageNum = 0;
-                messageNum < sourceConfiguration.getMaxFetchRecords()
-                        && deadline.hasTimeLeft()
-                        && isNotWakeup();
+                messageNum < sourceConfiguration.getMaxFetchRecords() && deadline.hasTimeLeft();
                 messageNum++) {
             try {
                 Duration timeout = deadline.timeLeftIfAny();
@@ -117,14 +110,18 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
                     break;
                 }
 
-                // Deserialize message.
-                collector.setMessage(message);
-                deserializationSchema.deserialize(message, collector);
+                StopCondition condition = stopCursor.shouldStop(message);
+
+                if (condition == StopCondition.CONTINUE || condition == StopCondition.EXACTLY) {
+                    // Deserialize message.
+                    collector.setMessage(message);
+                    deserializationSchema.deserialize(message, collector);
 
-                // Acknowledge message if need.
-                finishedPollMessage(message);
+                    // Acknowledge message if need.
+                    finishedPollMessage(message);
+                }
 
-                if (stopCursor.shouldStop(message)) {
+                if (condition == StopCondition.EXACTLY || condition == StopCondition.TERMINATE) {
                     builder.addFinishedSplit(splitId);
                     break;
                 }
@@ -165,23 +162,27 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
                 newSplits.size() == 1, "This pulsar split reader only support one split.");
         PulsarPartitionSplit newSplit = newSplits.get(0);
 
+        // Open stop cursor.
+        newSplit.open(pulsarAdmin);
+
+        // Before creating the consumer.
+        beforeCreatingConsumer(newSplit);
+
         // Create pulsar consumer.
         Consumer<byte[]> consumer = createPulsarConsumer(newSplit);
 
-        // Open start & stop cursor.
-        newSplit.open(pulsarAdmin);
-
-        // Start Consumer.
-        startConsumer(newSplit, consumer);
+        // After creating the consumer.
+        afterCreatingConsumer(newSplit, consumer);
 
         LOG.info("Register split {} consumer for current reader.", newSplit);
+
         this.registeredSplit = newSplit;
         this.pulsarConsumer = consumer;
     }
 
     @Override
     public void wakeUp() {
-        wakeup.compareAndSet(false, true);
+        // Nothing to do on this method.
     }
 
     @Override
@@ -197,14 +198,16 @@ abstract class PulsarPartitionSplitReaderBase<OUT>
 
     protected abstract void finishedPollMessage(Message<byte[]> message);
 
-    protected abstract void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer);
-
-    // --------------------------- Helper Methods -----------------------------
+    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
+        // Nothing to do by default.
+    }
 
-    protected boolean isNotWakeup() {
-        return !wakeup.get();
+    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+        // Nothing to do by default.
     }
 
+    // --------------------------- Helper Methods -----------------------------
+
     /** Create a specified {@link Consumer} by the given split information. */
     protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
         return createPulsarConsumer(split.getPartition());
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 5940cc9ac19..cc0167d8d14 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -126,7 +126,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl
     }
 
     @Override
-    protected void startConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
+    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
         TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
 
         // Abort the uncommitted pulsar transaction.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 90e29ca4712..458be403fbf 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -30,13 +30,15 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link SourceSplit} implementation for a Pulsar's partition. */
 @Internal
-public class PulsarPartitionSplit implements SourceSplit {
+public class PulsarPartitionSplit implements SourceSplit, Serializable {
+    private static final long serialVersionUID = -6857317360756062625L;
 
     private final TopicPartition partition;
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 2f1faa53672..fa46f9a4cdc 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -24,8 +24,6 @@ import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingC
 import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -58,22 +56,4 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestContext
     PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
             new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new);
-
-    @Override
-    public void testScaleUp(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleUp(testEnv, externalContext, semantic);
-    }
-
-    @Override
-    public void testScaleDown(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testScaleDown(testEnv, externalContext, semantic);
-    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
index 38c40ed88ef..5f18e8f5131 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
@@ -34,6 +34,7 @@ import java.util.Set;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -51,7 +52,9 @@ class PulsarSourceEnumStateSerializerTest {
                 Collections.singleton(
                         new PulsarPartitionSplit(
                                 new TopicPartition(randomAlphabetic(10), 10, createFullRange()),
-                                StopCursor.defaultStopCursor()));
+                                StopCursor.defaultStopCursor(),
+                                newMessageId(100L, 23L, 44),
+                                null));
         Map<Integer, Set<PulsarPartitionSplit>> shared = Collections.singletonMap(5, splits);
         Map<Integer, Set<String>> mapping =
                 ImmutableMap.of(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 1dcbe84ba61..aebb76119df 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.ReaderInfo;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner;
+import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
@@ -51,6 +53,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
@@ -368,6 +371,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
         Configuration configuration = operator().config();
         configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         if (enablePeriodicPartitionDiscovery) {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L);
         } else {
@@ -375,15 +379,15 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
         }
         SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
 
-        SplitsAssignmentState assignmentState =
-                new SplitsAssignmentState(latest(), sourceConfiguration, sourceEnumState);
+        SplitAssigner assigner =
+                SplitAssignerFactory.create(latest(), sourceConfiguration, sourceEnumState);
         return new PulsarSourceEnumerator(
                 subscriber,
                 StartCursor.earliest(),
                 new FullRangeGenerator(),
                 sourceConfiguration,
                 enumContext,
-                assignmentState);
+                assigner);
     }
 
     private void registerReader(
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
deleted file mode 100644
index ac811c3dddb..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/SplitsAssignmentStateTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator;
-
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.InstanceOfAssertFactories.map;
-
-/** Unit tests for {@link SplitsAssignmentState}. */
-class SplitsAssignmentStateTest {
-
-    private final Set<TopicPartition> partitions =
-            Sets.newHashSet(
-                    new TopicPartition("some-topic", 1, new TopicRange(1, 30)),
-                    new TopicPartition("some-topic", 2, new TopicRange(31, 60)),
-                    new TopicPartition("some-topic", 3, new TopicRange(61, MAX_RANGE)),
-                    new TopicPartition(randomAlphabetic(10), -1, createFullRange()));
-
-    @Test
-    void assignSplitsForSharedSubscription() {
-        SplitsAssignmentState state1 =
-                new SplitsAssignmentState(
-                        StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Shared));
-        state1.appendTopicPartitions(partitions);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
-        assertThat(assignment1)
-                .isPresent()
-                .get()
-                .extracting(SplitsAssignment::assignment)
-                .asInstanceOf(map(Integer.class, List.class))
-                .hasSize(5)
-                .allSatisfy((idx, list) -> assertThat(list).hasSize(4));
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-        assertThat(assignment2).isNotPresent();
-
-        // Reassign reader 3.
-        state1.putSplitsBackToPendingList(assignment1.get().assignment().get(3), 3);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment3 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 4));
-        assertThat(assignment3).isNotPresent();
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment4 =
-                state1.assignSplits(singletonList(3));
-        assertThat(assignment4)
-                .isPresent()
-                .get()
-                .extracting(SplitsAssignment::assignment)
-                .asInstanceOf(map(Integer.class, List.class))
-                .hasSize(1);
-    }
-
-    @Test
-    void assignSplitsForExclusiveSubscription() {
-        SplitsAssignmentState state1 =
-                new SplitsAssignmentState(
-                        StopCursor.defaultStopCursor(), createConfig(SubscriptionType.Exclusive));
-        state1.appendTopicPartitions(partitions);
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment1 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-
-        assertThat(assignment1).isPresent();
-        assertThat(assignment1.get().assignment())
-                .hasSize(4)
-                .allSatisfy((idx, list) -> assertThat(list).hasSize(1));
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment2 =
-                state1.assignSplits(Lists.newArrayList(0, 1, 2, 3, 4));
-        assertThat(assignment2).isNotPresent();
-    }
-
-    private SourceConfiguration createConfig(SubscriptionType type) {
-        Configuration configuration = new Configuration();
-        configuration.set(PULSAR_SUBSCRIPTION_TYPE, type);
-
-        return new SourceConfiguration(configuration);
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
new file mode 100644
index 00000000000..2e9ada3b741
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link NonSharedSplitAssigner}. */
+class NonSharedSplitAssignerTest extends SplitAssignerTestBase<NonSharedSplitAssigner> {
+
+    @Test
+    void noMoreSplits() {
+        NonSharedSplitAssigner assigner = splitAssigner(true);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner = splitAssigner(false);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.registerTopicPartitions(createPartitions("f", 8));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(1));
+        assertTrue(assigner.noMoreSplits(1));
+        assertTrue(assigner.noMoreSplits(3));
+    }
+
+    @Test
+    void partitionsAssignment() {
+        NonSharedSplitAssigner assigner = splitAssigner(true);
+        assigner.registerTopicPartitions(createPartitions("d", 4));
+        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+        // Assignment with initial states.
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(1);
+
+        // Reassignment with same readers.
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+
+        // Register new partition and assign.
+        assigner.registerTopicPartitions(createPartitions("e", 5));
+        assigner.registerTopicPartitions(createPartitions("f", 1));
+        assigner.registerTopicPartitions(createPartitions("g", 3));
+        assigner.registerTopicPartitions(createPartitions("h", 4));
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Assign to new readers.
+        readers = Arrays.asList(2, 4, 6, 8);
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+    }
+
+    @Override
+    protected NonSharedSplitAssigner createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
new file mode 100644
index 00000000000..91584b87688
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link SharedSplitAssigner}. */
+class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> {
+
+    @Test
+    void noMoreSplits() {
+        SharedSplitAssigner assigner = splitAssigner(true);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner = splitAssigner(false);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.registerTopicPartitions(createPartitions("f", 8));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(1));
+        assertTrue(assigner.noMoreSplits(1));
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner.createAssignment(singletonList(3));
+        assertTrue(assigner.noMoreSplits(3));
+    }
+
+    @Test
+    void partitionsAssignment() {
+        SharedSplitAssigner assigner = splitAssigner(true);
+        assigner.registerTopicPartitions(createPartitions("d", 4));
+        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
+
+        // Assignment with initial states.
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Reassignment with same readers.
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+
+        // Register new partition and assign.
+        assigner.registerTopicPartitions(createPartitions("e", 5));
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(4);
+
+        // Assign to new readers.
+        readers = Arrays.asList(2, 4, 6, 8);
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment())
+                .hasSize(4)
+                .allSatisfy((k, v) -> assertThat(v).hasSize(2));
+    }
+
+    @Override
+    protected SharedSplitAssigner createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState) {
+        return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
new file mode 100644
index 00000000000..65094014720
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.assigner;
+
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test utils for split assigners. */
+abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger {
+
+    @Test
+    void registerTopicPartitionsWillOnlyReturnNewPartitions() {
+        T assigner = splitAssigner(true);
+
+        Set<TopicPartition> partitions = createPartitions("persistent://public/default/a", 1);
+        List<TopicPartition> newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions)
+                .hasSize(1)
+                .first()
+                .hasFieldOrPropertyWithValue("topic", "persistent://public/default/a")
+                .hasFieldOrPropertyWithValue("partitionId", 1);
+
+        newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions).isEmpty();
+
+        partitions = createPartitions("persistent://public/default/b", 2);
+        newPartitions = assigner.registerTopicPartitions(partitions);
+        assertThat(newPartitions)
+                .hasSize(1)
+                .hasSize(1)
+                .first()
+                .hasFieldOrPropertyWithValue("topic", "persistent://public/default/b")
+                .hasFieldOrPropertyWithValue("partitionId", 2);
+    }
+
+    @Test
+    void noReadersProvideForAssignment() {
+        T assigner = splitAssigner(false);
+        assigner.registerTopicPartitions(createPartitions("c", 5));
+
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(emptyList());
+        assertThat(assignment).isNotPresent();
+    }
+
+    @Test
+    void noPartitionsProvideForAssignment() {
+        T assigner = splitAssigner(true);
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(singletonList(4));
+        assertThat(assignment).isNotPresent();
+    }
+
+    protected Set<TopicPartition> createPartitions(String topic, int partitionId) {
+        TopicPartition p1 = new TopicPartition(topic, partitionId, createFullRange());
+        return singleton(p1);
+    }
+
+    protected T splitAssigner(boolean discovery) {
+        Configuration configuration = new Configuration();
+
+        if (discovery) {
+            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L);
+        } else {
+            configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
+        }
+
+        SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
+        return createAssigner(defaultStopCursor(), sourceConfiguration, initialState());
+    }
+
+    protected abstract T createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            PulsarSourceEnumState sourceEnumState);
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index d003107793a..301368b6f4a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
 import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
@@ -43,12 +42,13 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test different implementation of StopCursor. */
-public class StopCursorTest extends PulsarTestSuiteBase {
+class StopCursorTest extends PulsarTestSuiteBase {
 
     @Test
     void publishTimeStopCursor() throws IOException {
@@ -64,7 +64,7 @@ public class StopCursorTest extends PulsarTestSuiteBase {
         // send the first message and set the stopCursor to filter any late stopCursor
         operator()
                 .sendMessage(
-                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        topicNameWithPartition(topicName, 0),
                         Schema.STRING,
                         randomAlphanumeric(10));
         long currentTimeStamp = System.currentTimeMillis();
@@ -85,12 +85,11 @@ public class StopCursorTest extends PulsarTestSuiteBase {
         // send the second message and expect it will not be received
         operator()
                 .sendMessage(
-                        TopicNameUtils.topicNameWithPartition(topicName, 0),
+                        topicNameWithPartition(topicName, 0),
                         Schema.STRING,
                         randomAlphanumeric(10));
         RecordsWithSplitIds<PulsarMessage<String>> secondResult = splitReader.fetch();
-        assertThat(secondResult.nextSplit()).isNotNull();
-        assertThat(firstResult.nextRecordFromSplit()).isNull();
+        assertThat(secondResult.nextSplit()).isNull();
         assertThat(secondResult.finishedSplits()).isNotEmpty();
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
index 538e45826d7..9ffbda74260 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
@@ -86,7 +85,7 @@ import static org.assertj.core.api.Assertions.assertThat;
     TestOrderlinessExtension.class,
     TestLoggerExtension.class,
 })
-public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
+abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
 
     @RegisterExtension
     PulsarSplitReaderInvocationContextProvider provider =
@@ -138,8 +137,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
         // create consumer and seek before split changes
         try (Consumer<byte[]> consumer = reader.createPulsarConsumer(partition)) {
             // inclusive messageId
-            StartCursor startCursor = StartCursor.fromMessageId(startPosition);
-            startCursor.seekPosition(partition.getTopic(), partition.getPartitionId(), consumer);
+            consumer.seek(startPosition);
         } catch (PulsarClientException e) {
             sneakyThrow(e);
         }
@@ -185,7 +183,7 @@ public abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuite
         if (verify) {
             assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount);
             if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
-                assertThat(finishedSplits).as("Split should not be marked as finished").hasSize(0);
+                assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty();
             } else {
                 assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1);
             }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index f238a03bfa5..9eaa24041c7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -24,6 +24,9 @@ import org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
 /** Common test context for pulsar based test. */
 public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
@@ -39,10 +42,13 @@ public abstract class PulsarTestContext<T> implements DataStreamSourceExternalCo
     // Helper methods for generating data.
 
     protected List<String> generateStringTestData(int splitIndex, long seed) {
-        int recordNum = 300;
+        Random random = new Random(seed);
+        int recordNum = 300 + random.nextInt(200);
         List<String> records = new ArrayList<>(recordNum);
+
         for (int i = 0; i < recordNum; i++) {
-            records.add(splitIndex + "-" + i);
+            int length = random.nextInt(40) + 10;
+            records.add(splitIndex + "-" + i + "-" + randomAlphanumeric(length));
         }
 
         return records;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index b8c49581f75..be387c583b1 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -31,13 +31,14 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -49,10 +50,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -62,9 +61,7 @@ import java.util.stream.Stream;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
@@ -79,8 +76,10 @@ import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WR
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
 import static org.apache.pulsar.client.api.SubscriptionMode.Durable;
 import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /**
  * A pulsar cluster operator used for operating pulsar instance. It's serializable for using in
@@ -178,7 +177,7 @@ public class PulsarRuntimeOperator implements Closeable {
      */
     public void createTopic(String topic, int numberOfPartitions) {
         checkArgument(numberOfPartitions >= 0);
-        if (numberOfPartitions <= 0) {
+        if (numberOfPartitions == 0) {
             createNonPartitionedTopic(topic);
         } else {
             createPartitionedTopic(topic, numberOfPartitions);
@@ -196,7 +195,7 @@ public class PulsarRuntimeOperator implements Closeable {
                 sneakyAdmin(() -> admin().topics().getPartitionedTopicMetadata(topic));
         checkArgument(
                 metadata.partitions < newPartitionsNum,
-                "The new partition size which should exceed previous size.");
+                "The new partition size which should greater than previous size.");
 
         sneakyAdmin(() -> admin().topics().updatePartitionedTopic(topic, newPartitionsNum));
     }
@@ -220,9 +219,11 @@ public class PulsarRuntimeOperator implements Closeable {
             return;
         }
 
+        // Close all the available consumers and producers.
         removeConsumers(topic);
         removeProducers(topic);
-        if (metadata.partitions <= 0) {
+
+        if (metadata.partitions == NON_PARTITIONED) {
             sneakyAdmin(() -> admin().topics().delete(topicName));
         } else {
             sneakyAdmin(() -> admin().topics().deletePartitionedTopic(topicName));
@@ -245,22 +246,6 @@ public class PulsarRuntimeOperator implements Closeable {
         }
     }
 
-    /**
-     * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
-     * mapping for topic and its partitions.
-     */
-    public Map<String, List<TopicPartition>> topicsInfo(String... topics) {
-        return topicsInfo(Arrays.asList(topics));
-    }
-
-    /**
-     * Query a list of topics. Convert the topic metadata into a list of topic partitions. Return a
-     * mapping for topic and its partitions.
-     */
-    public Map<String, List<TopicPartition>> topicsInfo(Collection<String> topics) {
-        return topics.stream().collect(toMap(identity(), this::topicInfo));
-    }
-
     /**
      * Send a single message to Pulsar, return the message id after the ack from Pulsar.
      *
@@ -518,12 +503,13 @@ public class PulsarRuntimeOperator implements Closeable {
                 topicProducers.computeIfAbsent(
                         index,
                         i -> {
-                            try {
-                                return client().newProducer(schema).topic(topic).create();
-                            } catch (PulsarClientException e) {
-                                sneakyThrow(e);
-                                return null;
-                            }
+                            ProducerBuilder<T> builder =
+                                    client().newProducer(schema)
+                                            .topic(topic)
+                                            .enableBatching(false)
+                                            .enableMultiSchema(true);
+
+                            return sneakyClient(builder::create);
                         });
     }
 
@@ -540,19 +526,15 @@ public class PulsarRuntimeOperator implements Closeable {
                 topicConsumers.computeIfAbsent(
                         index,
                         i -> {
-                            try {
-                                return client().newConsumer(schema)
-                                        .topic(topic)
-                                        .subscriptionName(SUBSCRIPTION_NAME)
-                                        .subscriptionMode(Durable)
-                                        .subscriptionType(Exclusive)
-                                        .subscriptionInitialPosition(
-                                                SubscriptionInitialPosition.Earliest)
-                                        .subscribe();
-                            } catch (PulsarClientException e) {
-                                sneakyThrow(e);
-                                return null;
-                            }
+                            ConsumerBuilder<T> builder =
+                                    client().newConsumer(schema)
+                                            .topic(topic)
+                                            .subscriptionName(SUBSCRIPTION_NAME)
+                                            .subscriptionMode(Durable)
+                                            .subscriptionType(Exclusive)
+                                            .subscriptionInitialPosition(Earliest);
+
+                            return sneakyClient(builder::subscribe);
                         });
     }
 
@@ -561,11 +543,7 @@ public class PulsarRuntimeOperator implements Closeable {
         ConcurrentHashMap<Integer, Producer<?>> integerProducers = producers.remove(topicName);
         if (integerProducers != null) {
             for (Producer<?> producer : integerProducers.values()) {
-                try {
-                    producer.close();
-                } catch (PulsarClientException e) {
-                    sneakyThrow(e);
-                }
+                sneakyClient(producer::close);
             }
         }
     }
@@ -575,11 +553,7 @@ public class PulsarRuntimeOperator implements Closeable {
         ConcurrentHashMap<Integer, Consumer<?>> integerConsumers = consumers.remove(topicName);
         if (integerConsumers != null) {
             for (Consumer<?> consumer : integerConsumers.values()) {
-                try {
-                    consumer.close();
-                } catch (PulsarClientException e) {
-                    sneakyThrow(e);
-                }
+                sneakyClient(consumer::close);
             }
         }
     }