You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/03 08:59:04 UTC

[flink-connector-pulsar] branch main updated: [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unorder consuming, change related tests. (#10)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new d8f6056  [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unorder consuming, change related tests. (#10)
d8f6056 is described below

commit d8f60565688eef7135c6556dfa66e4cace1feb35
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Jan 3 16:58:58 2023 +0800

    [FLINK-30413][Connector/Pulsar] Drop subscription support, remove unorder consuming, change related tests. (#10)
---
 .../docs/connectors/datastream/pulsar.md           |  52 +--
 docs/content/docs/connectors/datastream/pulsar.md  |  65 +--
 .../generated/pulsar_consumer_configuration.html   |   6 -
 .../generated/pulsar_source_configuration.html     |   6 -
 flink-connector-pulsar-e2e-tests/pom.xml           |  93 +----
 ...rderedE2ECase.java => PulsarSourceE2ECase.java} |  14 +-
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  63 ---
 .../FlinkContainerWithPulsarEnvironment.java       |  10 -
 .../source/ExclusiveSubscriptionContext.java       |  47 ---
 .../pulsar/source/FailoverSubscriptionContext.java |  47 ---
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |   6 -
 .../connector/pulsar/source/PulsarSource.java      |   5 +-
 .../pulsar/source/PulsarSourceBuilder.java         |  63 +--
 .../pulsar/source/PulsarSourceOptions.java         |  12 +-
 .../source/config/PulsarSourceConfigUtils.java     |   6 +-
 .../pulsar/source/config/SourceConfiguration.java  |  39 +-
 .../PulsarSourceEnumStateSerializer.java           |   6 +-
 .../source/enumerator/PulsarSourceEnumerator.java  |   2 +-
 .../assigner/KeySharedSplitAssigner.java           |  93 -----
 .../assigner/NonSharedSplitAssigner.java           |  79 ----
 .../enumerator/assigner/SharedSplitAssigner.java   |  88 -----
 .../source/enumerator/assigner/SplitAssigner.java  |  13 +
 .../enumerator/assigner/SplitAssignerFactory.java  |  62 ---
 ...litAssignerBase.java => SplitAssignerImpl.java} |  51 ++-
 .../subscriber/impl/BasePulsarSubscriber.java      |  25 +-
 .../subscriber/impl/TopicListSubscriber.java       |   7 +-
 .../subscriber/impl/TopicPatternSubscriber.java    |   5 +-
 .../source/enumerator/topic/TopicPartition.java    |  27 +-
 .../topic/range/FixedKeysRangeGenerator.java       |  20 +-
 .../topic/range/FixedRangeGenerator.java           |  57 ---
 .../enumerator/topic/range/FullRangeGenerator.java |   5 -
 .../enumerator/topic/range/RangeGenerator.java     |  47 ---
 .../topic/range/SplitRangeGenerator.java           |  82 ----
 .../enumerator/topic/range/TopicRangeUtils.java    |   9 +-
 ...erBase.java => PulsarPartitionSplitReader.java} | 126 +++---
 .../reader/{emitter => }/PulsarRecordEmitter.java  |   9 +-
 ...erBase.java => PulsarSourceFetcherManager.java} |  41 +-
 ...edSourceReader.java => PulsarSourceReader.java} | 131 ++++--
 .../source/reader/PulsarSourceReaderFactory.java   | 132 -------
 .../deserializer/PulsarDeserializationSchema.java  |  10 +-
 .../fetcher/PulsarOrderedFetcherManager.java       |  75 ----
 .../fetcher/PulsarUnorderedFetcherManager.java     |  71 ----
 .../reader/source/PulsarSourceReaderBase.java      |  98 -----
 .../reader/source/PulsarUnorderedSourceReader.java | 210 ----------
 .../split/PulsarOrderedPartitionSplitReader.java   | 129 ------
 .../split/PulsarUnorderedPartitionSplitReader.java | 161 --------
 .../pulsar/source/split/PulsarPartitionSplit.java  |   1 -
 .../split/PulsarPartitionSplitSerializer.java      |  16 +-
 .../pulsar/source/PulsarSourceBuilderTest.java     |  57 +--
 .../pulsar/source/PulsarSourceITCase.java          |   5 +
 .../pulsar/source/PulsarUnorderedSourceITCase.java |  60 ---
 .../PulsarSourceEnumStateSerializerTest.java       |  34 +-
 .../enumerator/PulsarSourceEnumeratorTest.java     | 135 ++-----
 .../assigner/NonSharedSplitAssignerTest.java       | 113 ------
 .../assigner/SharedSplitAssignerTest.java          | 121 ------
 ...nerTestBase.java => SplitAssignerImplTest.java} | 102 ++++-
 .../source/enumerator/cursor/StopCursorTest.java   |   6 +-
 .../topic/range/SplitRangeGeneratorTest.java       |  82 ----
 .../topic/range/TopicRangeUtilsTest.java           |   9 +-
 ...se.java => PulsarPartitionSplitReaderTest.java} | 437 ++++++++++-----------
 ...ReaderTest.java => PulsarSourceReaderTest.java} | 155 ++++++--
 .../reader/source/PulsarSourceReaderTestBase.java  | 241 ------------
 .../source/PulsarUnorderedSourceReaderTest.java    |  27 --
 .../PulsarOrderedPartitionSplitReaderTest.java     |  91 -----
 .../PulsarUnorderedPartitionSplitReaderTest.java   |  28 --
 .../split/PulsarPartitionSplitSerializerTest.java  |  48 ++-
 .../pulsar/testutils/extension/SubType.java        |  32 --
 .../extension/TestOrderlinessExtension.java        |  65 ---
 .../testutils/source/PulsarSourceTestContext.java  |   5 -
 .../source/UnorderedSourceTestSuiteBase.java       |  76 ----
 .../cases/MultipleTopicConsumingContext.java       |   7 -
 ...ntext.java => PartialKeysConsumingContext.java} |  16 +-
 .../source/cases/SharedSubscriptionContext.java    |  46 ---
 .../source/cases/SingleTopicConsumingContext.java  |   7 -
 74 files changed, 871 insertions(+), 3556 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 0778eb8..a89ade4 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -59,7 +59,6 @@ PulsarSource<String> source = PulsarSource.builder()
     .setTopics("my-topic")
     .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
     .setSubscriptionName("my-subscription")
-    .setSubscriptionType(SubscriptionType.Exclusive)
     .build();
 
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
@@ -77,7 +76,6 @@ pulsar_source = PulsarSource.builder() \
     .set_deserialization_schema(
         PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
     .set_subscription_name('my-subscription') \
-    .set_subscription_type(SubscriptionType.Exclusive) \
     .build()
 
 env.from_source(source=pulsar_source,
@@ -235,58 +233,10 @@ Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.ap
 
 如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。并一定要确保 `PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 是正确的结果。Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
 
-### Pulsar 订阅
-
-订阅是命名好的配置规则,指导消息如何投递给消费者。Pulsar Source 需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
-
-- [exclusive(独占)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#exclusive)
-- [shared(共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#shared%E5%85%B1%E4%BA%AB)
-- [failover(灾备)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#failover%E7%81%BE%E5%A4%87)
-- [key_shared(key 共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#key_shared)
-
-当前 Pulsar Source 里,`独占` 和 `灾备` 的实现没有区别,如果 Flink 的一个 reader 挂了,Pulsar Source 会把所有未消费的数据交给其他的 reader 来消费数据。
-
-默认情况下,如果没有指定订阅类型,Pulsar Source 使用共享订阅类型(`SubscriptionType.Shared`)。
-
-{{< tabs "pulsar-subscriptions" >}}
-{{< tab "Java" >}}
-
-```java
-// 名为 "my-shared" 的共享订阅
-PulsarSource.builder().setSubscriptionName("my-shared");
-
-// 名为 "my-exclusive" 的独占订阅
-PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
-```
-
-{{< /tab >}}
-{{< tab "Python" >}}
-
-```python
-# 名为 "my-shared" 的共享订阅
-PulsarSource.builder().set_subscription_name("my-shared")
-
-# 名为 "my-exclusive" 的独占订阅
-PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Key_Shared 订阅
-
-当时用 Key_Shared 订阅时,Pulsar 将会基于 Message 的 key 去计算对应的 Hash 值,Hash 取值范围为(0~65535)。我们首先会使用 `Message.getOrderingKey()` 计算 Hash,如果没有则会依次使用 `Message.getKey()` 和 `Message.getKeyBytes()`。对于上述 key 都找不到的消息,我们会使用字符串 `"NO_KEY"` 来计算消息的 Hash 值。
-
-在 Flink Connector 中针对 Key_Shared 订阅提供了两种消费模式,分别是 `KeySharedMode.SPLIT` 和 `KeySharedMode.JOIN`,它们的实际消费行为并不相同。`KeySharedMode.JOIN` 会把所有的给定的 Hash 范围放于一个 Reader 中进行消费,而 `KeySharedMode.SPLIT` 会打散给定的 Hash 范围于不同的 Reader 中消费。
-
-之所以这么设计的主要原因是因为,在 Key_Shared 的订阅模式中,如果一条消息找不到对应的消费者,所有的消息都不会继续往下发送。所以我们提供了 `KeySharedMode.JOIN` 模式,允许用户只消费部分 Hash 范围的消息。
-
-##### 定义 RangeGenerator
+### 定义 RangeGenerator
 
 如果想在 Pulsar Source 里面使用 `Key_Shared` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
 
-Pulsar Source 也提供了一个名为 `SplitRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
-
 由于 Pulsar 并未提供 Key 的 Hash 计算方法,所以我们在 Flink 中提供了名为 `FixedKeysRangeGenerator` 的实现,你可以在 builder 中依次提供需要消费的 Key 内容即可。但需要注意的是,Pulsar 的 Key Hash 值并不对应唯一的一个 Key,所以如果你只想消费某几个 Key 的消息,还需要在后面的代码中使用 `DataStream.filter()` 方法来过滤出对应的消息。
 
 ### 起始消费位置
diff --git a/docs/content/docs/connectors/datastream/pulsar.md b/docs/content/docs/connectors/datastream/pulsar.md
index 34a3bbc..c1fead3 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -63,7 +63,6 @@ PulsarSource<String> source = PulsarSource.builder()
     .setTopics("my-topic")
     .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
     .setSubscriptionName("my-subscription")
-    .setSubscriptionType(SubscriptionType.Exclusive)
     .build();
 
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
@@ -81,7 +80,6 @@ pulsar_source = PulsarSource.builder() \
     .set_deserialization_schema(
         PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
     .set_subscription_name('my-subscription') \
-    .set_subscription_type(SubscriptionType.Exclusive) \
     .build()
 
 env.from_source(source=pulsar_source,
@@ -261,71 +259,12 @@ If you want to deserialize the Pulsar message by these properties, you need to i
 Ensure that the `TypeInformation` from the `PulsarDeserializationSchema.getProducedType()` is correct.
 Flink uses this `TypeInformation` to pass the messages to downstream operators.
 
-### Pulsar Subscriptions
+### Define a RangeGenerator
 
-A Pulsar subscription is a named configuration rule that determines how messages are delivered to Flink readers.
-The subscription name is required for consuming messages. Pulsar connector supports four subscription types:
-
-- [Exclusive](https://pulsar.apache.org/docs/en/concepts-messaging/#exclusive)
-- [Shared](https://pulsar.apache.org/docs/en/concepts-messaging/#shared)
-- [Failover](https://pulsar.apache.org/docs/en/concepts-messaging/#failover)
-- [Key_Shared](https://pulsar.apache.org/docs/en/concepts-messaging/#key_shared)
-
-There is no difference between `Exclusive` and `Failover` in the Pulsar connector.
-When a Flink reader crashes, all (non-acknowledged and subsequent) messages are redelivered to the available Flink readers.
-
-By default, if no subscription type is defined, Pulsar source uses the `Shared` subscription type.
-
-{{< tabs "pulsar-subscriptions" >}}
-{{< tab "Java" >}}
-
-```java
-// Shared subscription with name "my-shared"
-PulsarSource.builder().setSubscriptionName("my-shared");
-
-// Exclusive subscription with name "my-exclusive"
-PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
-```
-
-{{< /tab >}}
-{{< tab "Python" >}}
-
-```python
-# Shared subscription with name "my-shared"
-PulsarSource.builder().set_subscription_name("my-shared")
-
-# Exclusive subscription with name "my-exclusive"
-PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
-```
-
-{{< /tab >}}
-{{< /tabs >}}
-
-#### Key_Shared subscriptions
-
-All the Pulsar's messages will be calculated with a key hash in Key_Shared subscription.
-The hash range must be 0 to 65535. We try to compute the key hash in the order of `Message.getOrderingKey()`,
-`Message.getKey()` or `Message.getKeyBytes()`. We will use `"NO_KEY"` str as the message key if none of these keys has been provided.
-
-Pulsar's Key_Shared subscription comes in two forms in Connector, the `KeySharedMode.SPLIT` and `KeySharedMode.JOIN`.
-Different `KeySharedMode` means different split assignment behaviors. If you only consume a subset of Pulsar's key hash range,
-remember to use the `KeySharedMode.JOIN` which will subscribe all the range in only one reader.
-Otherwise, when the ranges can join into a full Pulsar key hash range (0~65535) you should use `KeySharedMode.SPLIT`
-mode for sharing the splits among all the backend readers.
-
-In the `KeySharedMode.SPLIT` mode. The topic will be subscribed by multiple readers.
-But Pulsar has one limit in this situation. That is if a Message can't find the corresponding reader by the key hash range.
-No messages will be delivered to the current readers, until there is a reader which can subscribe to such messages.
-
-##### Define a RangeGenerator
-
-Ensure that you have provided a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you have provided a `RangeGenerator` implementation if you want to consume a subset of keys on the Pulsar connector.
 The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches
 messages where the hash of the message key is contained in the specified range.
 
-The Pulsar connector uses `SplitRangeGenerator` that divides the range by the Flink source
-parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
-
 Since the Pulsar didn't expose the key hash range method. We have to provide an `FixedKeysRangeGenerator` for end-user.
 You can add the keys you want to consume, no need to calculate any hash ranges.
 The key's hash isn't specified to only one key, so the consuming results may contain the messages with
diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
index 41f0b24..7eb3cae 100644
--- a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -146,12 +146,6 @@ C5, 1, 1
             <td>String</td>
             <td>Specify the subscription name for this consumer. This argument is required when constructing the consumer.</td>
         </tr>
-        <tr>
-            <td><h5>pulsar.consumer.subscriptionType</h5></td>
-            <td style="word-wrap: break-word;">Shared</td>
-            <td><p>Enum</p></td>
-            <td>Subscription type.<br /><br />Four subscription types are available:<ul><li>Exclusive</li><li>Failover</li><li>Shared</li><li>Key_Shared</li></ul><br /><br />Possible values:<ul><li>"Exclusive"</li><li>"Shared"</li><li>"Failover"</li><li>"Key_Shared"</li></ul></td>
-        </tr>
         <tr>
             <td><h5>pulsar.consumer.tickDurationMillis</h5></td>
             <td style="word-wrap: break-word;">1000</td>
diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
index e28ffd5..c66423d 100644
--- a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -50,12 +50,6 @@
             <td>Long</td>
             <td>The interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.</td>
         </tr>
-        <tr>
-            <td><h5>pulsar.source.transactionTimeoutMillis</h5></td>
-            <td style="word-wrap: break-word;">10800000</td>
-            <td>Long</td>
-            <td>This option is used in <code class="highlighter-rouge">Shared</code> or <code class="highlighter-rouge">Key_Shared</code> subscription. You should configure this option when you do not enable the <code class="highlighter-rouge">pulsar.source.enableAutoAcknowledgeMessage</code> option.<br />The value (in ms) should be greater than the checkpoint interval.</td>
-        </tr>
         <tr>
             <td><h5>pulsar.source.verifyInitialOffsets</h5></td>
             <td style="word-wrap: break-word;">WARN_ON_MISMATCH</td>
diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml
index 2ff0083..c88e268 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -37,7 +37,7 @@ under the License.
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-pulsar</artifactId>
+			<artifactId>flink-sql-connector-pulsar</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
@@ -136,98 +136,17 @@ under the License.
 					<artifactItems>
 						<artifactItem>
 							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-connector-test-utils</artifactId>
-							<version>${flink.version}</version>
-							<destFileName>flink-connector-testing.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies
-							</outputDirectory>
-						</artifactItem>
-						<artifactItem>
-							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-connector-pulsar</artifactId>
+							<artifactId>flink-sql-connector-pulsar</artifactId>
 							<version>${project.version}</version>
 							<destFileName>pulsar-connector.jar</destFileName>
 							<type>jar</type>
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
 						<artifactItem>
-							<groupId>org.apache.pulsar</groupId>
-							<artifactId>pulsar-client-all</artifactId>
-							<version>${pulsar.version}</version>
-							<destFileName>pulsar-client-all.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</artifactItem>
-						<artifactItem>
-							<groupId>org.apache.pulsar</groupId>
-							<artifactId>pulsar-client-api</artifactId>
-							<version>${pulsar.version}</version>
-							<destFileName>pulsar-client-api.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</artifactItem>
-						<artifactItem>
-							<groupId>org.apache.pulsar</groupId>
-							<artifactId>pulsar-client-admin-api</artifactId>
-							<version>${pulsar.version}</version>
-							<destFileName>pulsar-admin-api.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</artifactItem>
-						<dependency>
-							<groupId>org.apache.pulsar</groupId>
-							<artifactId>bouncy-castle-bc</artifactId>
-							<version>${pulsar.version}</version>
-							<destFileName>bouncy-castle-bc.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<dependency>
-							<groupId>org.bouncycastle</groupId>
-							<artifactId>bcpkix-jdk15on</artifactId>
-							<version>${bouncycastle.version}</version>
-							<destFileName>bcpkix-jdk15on.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<dependency>
-							<groupId>org.bouncycastle</groupId>
-							<artifactId>bcprov-jdk15on</artifactId>
-							<version>${bouncycastle.version}</version>
-							<destFileName>bcprov-jdk15on.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<dependency>
-							<groupId>org.bouncycastle</groupId>
-							<artifactId>bcutil-jdk15on</artifactId>
-							<version>${bouncycastle.version}</version>
-							<destFileName>bcutil-jdk15on.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<dependency>
-							<groupId>org.bouncycastle</groupId>
-							<artifactId>bcprov-ext-jdk15on</artifactId>
-							<version>${bouncycastle.version}</version>
-							<destFileName>bcprov-ext-jdk15on.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<dependency>
-							<groupId>javax.xml.bind</groupId>
-							<artifactId>jaxb-api</artifactId>
-							<version>${jaxb-api.version}</version>
-							<destFileName>jaxb-api.jar</destFileName>
-							<type>jar</type>
-							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
-						</dependency>
-						<artifactItem>
-							<groupId>org.slf4j</groupId>
-							<artifactId>jul-to-slf4j</artifactId>
-							<version>${slf4j.version}</version>
-							<destFileName>jul-to-slf4j.jar</destFileName>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${flink.version}</version>
+							<destFileName>flink-connector-testing.jar</destFileName>
 							<type>jar</type>
 							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
 						</artifactItem>
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
similarity index 79%
rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
index c223fb6..385e58a 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
@@ -19,6 +19,8 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,8 +29,6 @@ import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.ExclusiveSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.FailoverSubscriptionContext;
 
 import org.junit.jupiter.api.Tag;
 
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.Tag;
  */
 @SuppressWarnings("unused")
 @Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
+public class PulsarSourceE2ECase extends SourceTestSuiteBase<String> {
 
     // Defines the Semantic.
     @TestSemantics
@@ -54,10 +54,10 @@ public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
 
     // Defines a set of external context Factories for different test cases.
     @TestContext
-    PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive =
-            new PulsarTestContextFactory<>(pulsar, ExclusiveSubscriptionContext::new);
+    PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
+            new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new);
 
     @TestContext
-    PulsarTestContextFactory<String, FailoverSubscriptionContext> failover =
-            new PulsarTestContextFactory<>(pulsar, FailoverSubscriptionContext::new);
+    PulsarTestContextFactory<String, PartialKeysConsumingContext> partialKeys =
+            new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new);
 }
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
deleted file mode 100644
index 770d118..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-
-import org.junit.jupiter.api.Tag;
-
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
- * subscription.
- */
-@SuppressWarnings("unused")
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
-
-    // Defines the Semantic.
-    @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
-    // Defines TestEnvironment.
-    @TestEnv
-    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
-
-    // Defines ConnectorExternalSystem.
-    @TestExternalSystem
-    PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
-
-    // Defines a set of external context Factories for different test cases.
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
-    @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
-            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 65e99a8..d116d7e 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -34,16 +34,6 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
                 numTaskManagers,
                 numSlotsPerTaskManager,
                 resourcePath("pulsar-connector.jar"),
-                resourcePath("pulsar-client-all.jar"),
-                resourcePath("pulsar-client-api.jar"),
-                resourcePath("pulsar-admin-api.jar"),
-                resourcePath("bouncy-castle-bc.jar"),
-                resourcePath("bcpkix-jdk15on.jar"),
-                resourcePath("bcprov-jdk15on.jar"),
-                resourcePath("bcutil-jdk15on.jar"),
-                resourcePath("bcprov-ext-jdk15on.jar"),
-                resourcePath("jaxb-api.jar"),
-                resourcePath("jul-to-slf4j.jar"),
                 resourcePath("flink-connector-testing.jar"));
     }
 
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
deleted file mode 100644
index 4906ad6..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
-public class ExclusiveSubscriptionContext extends MultipleTopicConsumingContext {
-
-    public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Exclusive";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-exclusive-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Exclusive;
-    }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
deleted file mode 100644
index 3134db4..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicConsumingContext {
-
-    public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Failover";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-failover-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Failover;
-    }
-}
diff --git a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 40e7dc9..37f6ba8 100644
--- a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -10,9 +10,3 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
index 0786062..66cc202 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
@@ -36,7 +36,7 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReader;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
@@ -133,8 +133,7 @@ public final class PulsarSource<OUT>
                 new PulsarDeserializationSchemaInitializationContext(readerContext);
         deserializationSchema.open(initializationContext, sourceConfiguration);
 
-        return PulsarSourceReaderFactory.create(
-                readerContext, deserializationSchema, sourceConfiguration);
+        return PulsarSourceReader.create(sourceConfiguration, deserializationSchema, readerContext);
     }
 
     @Internal
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 5aac3d2..5cd59b5 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -33,7 +33,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.SplitRangeGenerator;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -48,19 +47,14 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import static java.lang.Boolean.FALSE;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
-import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
 import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_TRANSACTION_TIMEOUT;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR;
 import static org.apache.flink.util.InstantiationUtil.isSerializable;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -177,9 +171,11 @@ public final class PulsarSourceBuilder<OUT> {
      * @return this PulsarSourceBuilder.
      * @see <a href="https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions">Pulsar
      *     Subscriptions</a>
+     * @deprecated We don't support setting the subscription type now.
      */
+    @Deprecated
     public PulsarSourceBuilder<OUT> setSubscriptionType(SubscriptionType subscriptionType) {
-        return setConfig(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        return this;
     }
 
     /**
@@ -278,23 +274,13 @@ public final class PulsarSourceBuilder<OUT> {
     }
 
     /**
-     * Set a topic range generator for Key_Shared subscription.
+     * Set a topic range generator for consuming a sub set of keys.
      *
      * @param rangeGenerator A generator which would generate a set of {@link TopicRange} for given
      *     topic.
      * @return this PulsarSourceBuilder.
      */
     public PulsarSourceBuilder<OUT> setRangeGenerator(RangeGenerator rangeGenerator) {
-        if (configBuilder.contains(PULSAR_SUBSCRIPTION_TYPE)) {
-            SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE);
-            checkArgument(
-                    subscriptionType == SubscriptionType.Key_Shared,
-                    "Key_Shared subscription should be used for custom rangeGenerator instead of %s",
-                    subscriptionType);
-        } else {
-            LOG.warn("No subscription type provided, set it to Key_Shared.");
-            setSubscriptionType(SubscriptionType.Key_Shared);
-        }
         this.rangeGenerator = checkNotNull(rangeGenerator);
         return this;
     }
@@ -383,6 +369,9 @@ public final class PulsarSourceBuilder<OUT> {
      */
     public PulsarSourceBuilder<OUT> setAuthentication(
             String authPluginClassName, String authParamsString) {
+        checkArgument(
+                !configBuilder.contains(PULSAR_AUTH_PARAM_MAP),
+                "Duplicated authentication setting.");
         configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
         configBuilder.set(PULSAR_AUTH_PARAMS, authParamsString);
         return this;
@@ -397,6 +386,8 @@ public final class PulsarSourceBuilder<OUT> {
      */
     public PulsarSourceBuilder<OUT> setAuthentication(
             String authPluginClassName, Map<String, String> authParams) {
+        checkArgument(
+                !configBuilder.contains(PULSAR_AUTH_PARAMS), "Duplicated authentication setting.");
         configBuilder.set(PULSAR_AUTH_PLUGIN_CLASS_NAME, authPluginClassName);
         configBuilder.set(PULSAR_AUTH_PARAM_MAP, authParams);
         return this;
@@ -450,20 +441,12 @@ public final class PulsarSourceBuilder<OUT> {
      */
     @SuppressWarnings("java:S3776")
     public PulsarSource<OUT> build() {
-
         // Ensure the topic subscriber for pulsar.
         checkNotNull(subscriber, "No topic names or topic pattern are provided.");
 
-        SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE);
-        if (subscriptionType == SubscriptionType.Key_Shared) {
-            if (rangeGenerator == null) {
-                LOG.warn(
-                        "No range generator provided for key_shared subscription,"
-                                + " we would use the SplitRangeGenerator as the default range generator.");
-                this.rangeGenerator = new SplitRangeGenerator();
-            }
-        } else {
-            // Override the range generator.
+        if (rangeGenerator == null) {
+            LOG.warn(
+                    "No range generator provided, we would use the FullRangeGenerator as the default range generator.");
             this.rangeGenerator = new FullRangeGenerator();
         }
 
@@ -481,28 +464,6 @@ public final class PulsarSourceBuilder<OUT> {
 
         checkNotNull(deserializationSchema, "deserializationSchema should be set.");
 
-        // Enable transaction if the cursor auto commit is disabled for Key_Shared & Shared.
-        if (FALSE.equals(configBuilder.get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE))
-                && (subscriptionType == SubscriptionType.Key_Shared
-                        || subscriptionType == SubscriptionType.Shared)) {
-            LOG.info(
-                    "Pulsar cursor auto commit is disabled, make sure checkpoint is enabled "
-                            + "and your pulsar cluster is support the transaction.");
-            configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
-
-            if (!configBuilder.contains(PULSAR_READ_TRANSACTION_TIMEOUT)) {
-                LOG.warn(
-                        "The default pulsar transaction timeout is 3 hours, "
-                                + "make sure it was greater than your checkpoint interval.");
-            } else {
-                Long timeout = configBuilder.get(PULSAR_READ_TRANSACTION_TIMEOUT);
-                LOG.warn(
-                        "The configured transaction timeout is {} mille seconds, "
-                                + "make sure it was greater than your checkpoint interval.",
-                        timeout);
-            }
-        }
-
         if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
             LOG.warn(
                     "We recommend set a readable consumer name through setConsumerName(String) in production mode.");
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 8c2ab6b..cd61631 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -124,6 +124,8 @@ public final class PulsarSourceOptions {
                                             " We would automatically commit the cursor using the given period (in ms).")
                                     .build());
 
+    /** @deprecated We no longer need transactions for consuming messages. */
+    @Deprecated
     public static final ConfigOption<Long> PULSAR_READ_TRANSACTION_TIMEOUT =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "transactionTimeoutMillis")
                     .longType()
@@ -141,14 +143,6 @@ public final class PulsarSourceOptions {
                                             "The value (in ms) should be greater than the checkpoint interval.")
                                     .build());
 
-    /**
-     * @deprecated Use {@link #PULSAR_READ_TRANSACTION_TIMEOUT} instead. This would be removed in
-     *     the next release.
-     */
-    @Deprecated
-    public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS =
-            PULSAR_READ_TRANSACTION_TIMEOUT;
-
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
                     .longType()
@@ -236,6 +230,8 @@ public final class PulsarSourceOptions {
                                             " This argument is required when constructing the consumer.")
                                     .build());
 
+    /** @deprecated This config option is no longer supported. */
+    @Deprecated
     public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType")
                     .enumType(SubscriptionType.class)
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index d76ba0a..f5adfda 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.Map;
 import java.util.Optional;
@@ -61,7 +62,6 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TICK_DURATION_MILLIS;
 
 /** Create source related {@link Consumer} and validate config. */
@@ -100,7 +100,6 @@ public final class PulsarSourceConfigUtils {
         configuration.useOption(
                 PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS,
                 v -> builder.negativeAckRedeliveryDelay(v, MICROSECONDS));
-        configuration.useOption(PULSAR_SUBSCRIPTION_TYPE, builder::subscriptionType);
         configuration.useOption(PULSAR_SUBSCRIPTION_MODE, builder::subscriptionMode);
         configuration.useOption(PULSAR_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
         configuration.useOption(PULSAR_RECEIVER_QUEUE_SIZE, builder::receiverQueueSize);
@@ -135,6 +134,9 @@ public final class PulsarSourceConfigUtils {
             builder.properties(properties);
         }
 
+        // We only use exclusive subscription.
+        builder.subscriptionType(SubscriptionType.Exclusive);
+
         // Flink connector doesn't need any batch receiving behaviours.
         // Disable the batch-receive timer for the Consumer instance.
         builder.batchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index 5a52a59..63e2813 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.pulsar.source.config;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
@@ -42,10 +41,8 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_TRANSACTION_TIMEOUT;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_VERIFY_INITIAL_OFFSETS;
 
 /** The configuration class for pulsar source. */
@@ -57,12 +54,10 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final long partitionDiscoveryIntervalMs;
     private final boolean enableAutoAcknowledgeMessage;
     private final long autoCommitCursorInterval;
-    private final long transactionTimeoutMillis;
     private final Duration maxFetchTime;
     private final int maxFetchRecords;
     private final CursorVerification verifyInitialOffsets;
     private final String subscriptionName;
-    private final SubscriptionType subscriptionType;
     private final SubscriptionMode subscriptionMode;
     private final boolean allowKeySharedOutOfOrderDelivery;
     private final boolean enableMetrics;
@@ -74,12 +69,10 @@ public class SourceConfiguration extends PulsarConfiguration {
         this.partitionDiscoveryIntervalMs = get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
         this.enableAutoAcknowledgeMessage = get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
         this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
-        this.transactionTimeoutMillis = get(PULSAR_READ_TRANSACTION_TIMEOUT);
         this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
         this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
         this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
         this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME);
-        this.subscriptionType = get(PULSAR_SUBSCRIPTION_TYPE);
         this.subscriptionMode = get(PULSAR_SUBSCRIPTION_MODE);
         this.allowKeySharedOutOfOrderDelivery = get(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY);
         this.enableMetrics =
@@ -128,17 +121,6 @@ public class SourceConfiguration extends PulsarConfiguration {
         return autoCommitCursorInterval;
     }
 
-    /**
-     * Pulsar's transaction have a timeout mechanism for uncommitted transaction. We use transaction
-     * for {@link SubscriptionType#Shared} and {@link SubscriptionType#Key_Shared} when user disable
-     * {@link #isEnableAutoAcknowledgeMessage} and enable flink checkpoint. Since the checkpoint
-     * interval couldn't be acquired from {@link SourceReaderContext#getConfiguration()}, we have to
-     * expose this option. Make sure this value is greater than the checkpoint interval.
-     */
-    public long getTransactionTimeoutMillis() {
-        return transactionTimeoutMillis;
-    }
-
     /**
      * The fetch time for flink split reader polling message. We would stop polling message and
      * return the message in {@link RecordsWithSplitIds} when timeout or exceed the {@link
@@ -171,16 +153,6 @@ public class SourceConfiguration extends PulsarConfiguration {
         return subscriptionName;
     }
 
-    /**
-     * The pulsar's subscription type for this flink source. All the readers would share this
-     * subscription type.
-     *
-     * @see SubscriptionType
-     */
-    public SubscriptionType getSubscriptionType() {
-        return subscriptionType;
-    }
-
     /**
      * The pulsar's subscription mode for this flink source. All the readers would share this
      * subscription mode.
@@ -203,12 +175,7 @@ public class SourceConfiguration extends PulsarConfiguration {
 
     /** Convert the subscription into a readable str. */
     public String getSubscriptionDesc() {
-        return getSubscriptionName()
-                + "("
-                + getSubscriptionType()
-                + ","
-                + getSubscriptionMode()
-                + ")";
+        return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() + ")";
     }
 
     @Override
@@ -226,12 +193,10 @@ public class SourceConfiguration extends PulsarConfiguration {
         return partitionDiscoveryIntervalMs == that.partitionDiscoveryIntervalMs
                 && enableAutoAcknowledgeMessage == that.enableAutoAcknowledgeMessage
                 && autoCommitCursorInterval == that.autoCommitCursorInterval
-                && transactionTimeoutMillis == that.transactionTimeoutMillis
                 && maxFetchRecords == that.maxFetchRecords
                 && Objects.equals(maxFetchTime, that.maxFetchTime)
                 && verifyInitialOffsets == that.verifyInitialOffsets
                 && Objects.equals(subscriptionName, that.subscriptionName)
-                && subscriptionType == that.subscriptionType
                 && subscriptionMode == that.subscriptionMode
                 && allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery
                 && enableMetrics == that.enableMetrics;
@@ -244,12 +209,10 @@ public class SourceConfiguration extends PulsarConfiguration {
                 partitionDiscoveryIntervalMs,
                 enableAutoAcknowledgeMessage,
                 autoCommitCursorInterval,
-                transactionTimeoutMillis,
                 maxFetchTime,
                 maxFetchRecords,
                 verifyInitialOffsets,
                 subscriptionName,
-                subscriptionType,
                 subscriptionMode,
                 allowKeySharedOutOfOrderDelivery,
                 enableMetrics);
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
index 30b374b..f7a1edc 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java
@@ -41,7 +41,7 @@ public class PulsarSourceEnumStateSerializer
         implements SimpleVersionedSerializer<PulsarSourceEnumState> {
 
     // This version should be bumped after modifying the PulsarSourceEnumState.
-    public static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 3;
 
     public static final PulsarSourceEnumStateSerializer INSTANCE =
             new PulsarSourceEnumStateSerializer();
@@ -75,7 +75,9 @@ public class PulsarSourceEnumStateSerializer
         try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                 DataInputStream in = new DataInputStream(bais)) {
             Set<TopicPartition> partitions = null;
-            if (version == 2) {
+            if (version == 3) {
+                partitions = deserializeSet(in, deserializePartition(2));
+            } else if (version == 2) {
                 partitions = deserializeSet(in, deserializePartition(1));
             } else {
                 partitions = deserializeSet(in, deserializePartition(0));
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 8dbead2..ff1f073 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -48,7 +48,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
 import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
-import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory.createAssigner;
+import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner.createAssigner;
 
 /** The enumerator class for the pulsar source. */
 @Internal
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
deleted file mode 100644
index 9ef4347..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/KeySharedSplitAssigner.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/** This assigner is used for {@link SubscriptionType#Key_Shared} subscription. */
-@Internal
-public class KeySharedSplitAssigner extends SplitAssignerBase {
-
-    public KeySharedSplitAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        super(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-
-    @Override
-    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
-        List<TopicPartition> newPartitions = new ArrayList<>();
-
-        for (TopicPartition partition : fetchedPartitions) {
-            boolean shouldAssign = false;
-            if (!appendedPartitions.contains(partition)) {
-                appendedPartitions.add(partition);
-                newPartitions.add(partition);
-                shouldAssign = true;
-            }
-
-            // Reassign the incoming splits when restarting from state.
-            if (shouldAssign || !initialized) {
-                // Calculate the reader id by the current parallelism.
-                int readerId = partitionOwner(partition);
-                PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
-                addSplitToPendingList(readerId, split);
-            }
-        }
-
-        if (!initialized) {
-            initialized = true;
-        }
-
-        return newPartitions;
-    }
-
-    @Override
-    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
-        if (splits.isEmpty()) {
-            // In case of the task failure. No splits will be put back to the enumerator.
-            for (TopicPartition partition : appendedPartitions) {
-                int readId = partitionOwner(partition);
-                if (readId == subtaskId) {
-                    PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
-                    addSplitToPendingList(subtaskId, split);
-                }
-            }
-        } else {
-            // Manually put all the splits back to the enumerator.
-            for (PulsarPartitionSplit split : splits) {
-                int readerId = partitionOwner(split.getPartition());
-                addSplitToPendingList(readerId, split);
-            }
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
deleted file mode 100644
index 1cc061e..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This assigner is used for {@link SubscriptionType#Failover}, {@link SubscriptionType#Exclusive}
- * subscriptions.
- */
-@Internal
-class NonSharedSplitAssigner extends SplitAssignerBase {
-
-    public NonSharedSplitAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        super(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-
-    @Override
-    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
-        List<TopicPartition> newPartitions = new ArrayList<>();
-
-        for (TopicPartition partition : fetchedPartitions) {
-            if (!appendedPartitions.contains(partition)) {
-                appendedPartitions.add(partition);
-                newPartitions.add(partition);
-
-                // Calculate the reader id by the current parallelism.
-                int readerId = partitionOwner(partition);
-                PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
-                addSplitToPendingList(readerId, split);
-            }
-        }
-
-        if (!initialized) {
-            initialized = true;
-        }
-
-        return newPartitions;
-    }
-
-    @Override
-    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
-        for (PulsarPartitionSplit split : splits) {
-            int readerId = partitionOwner(split.getPartition());
-            addSplitToPendingList(readerId, split);
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
deleted file mode 100644
index 845bfd2..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/** This assigner is used for {@link SubscriptionType#Shared} subscription. */
-@Internal
-class SharedSplitAssigner extends SplitAssignerBase {
-
-    public SharedSplitAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        super(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-
-    @Override
-    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
-        List<TopicPartition> newPartitions = new ArrayList<>();
-
-        for (TopicPartition partition : fetchedPartitions) {
-            boolean shouldAssign = false;
-            if (!appendedPartitions.contains(partition)) {
-                appendedPartitions.add(partition);
-                newPartitions.add(partition);
-                shouldAssign = true;
-            }
-
-            // Reassign the incoming splits when restarting from state.
-            if (shouldAssign || !initialized) {
-                // Share the split to all the readers.
-                for (int i = 0; i < context.currentParallelism(); i++) {
-                    PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
-                    addSplitToPendingList(i, split);
-                }
-            }
-        }
-
-        if (!initialized) {
-            initialized = true;
-        }
-
-        return newPartitions;
-    }
-
-    @Override
-    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
-        if (splits.isEmpty()) {
-            // In case of the task failure. No splits will be put back to the enumerator.
-            for (TopicPartition partition : appendedPartitions) {
-                addSplitToPendingList(subtaskId, new PulsarPartitionSplit(partition, stopCursor));
-            }
-        } else {
-            for (PulsarPartitionSplit split : splits) {
-                addSplitToPendingList(subtaskId, split);
-            }
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
index 87faa28..5a27b54 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java
@@ -19,8 +19,11 @@
 package org.apache.flink.connector.pulsar.source.enumerator.assigner;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
@@ -63,4 +66,14 @@ public interface SplitAssigner {
 
     /** Expose this for standard flink metrics. */
     long getUnassignedSplitCount();
+
+    /** The factory for creating split assigner. */
+    static SplitAssigner createAssigner(
+            StopCursor stopCursor,
+            SourceConfiguration sourceConfiguration,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        boolean enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
+        return new SplitAssignerImpl(stopCursor, enablePartitionDiscovery, context, enumState);
+    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
deleted file mode 100644
index 4ade1e5..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** The factory for creating split assigner. */
-@Internal
-public final class SplitAssignerFactory {
-
-    private SplitAssignerFactory() {
-        // No public constructor.
-    }
-
-    public static SplitAssigner createAssigner(
-            StopCursor stopCursor,
-            SourceConfiguration sourceConfiguration,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
-        boolean enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery();
-
-        switch (subscriptionType) {
-            case Failover:
-            case Exclusive:
-                return new NonSharedSplitAssigner(
-                        stopCursor, enablePartitionDiscovery, context, enumState);
-            case Shared:
-                return new SharedSplitAssigner(
-                        stopCursor, enablePartitionDiscovery, context, enumState);
-            case Key_Shared:
-                return new KeySharedSplitAssigner(
-                        stopCursor, enablePartitionDiscovery, context, enumState);
-            default:
-                throw new IllegalArgumentException(
-                        "We don't support this subscription type: " + subscriptionType);
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java
similarity index 75%
rename from flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
rename to flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java
index fedb1aa..eece868 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImpl.java
@@ -35,17 +35,17 @@ import java.util.Optional;
 import java.util.Set;
 
 /** Common abstraction for split assigner. */
-abstract class SplitAssignerBase implements SplitAssigner {
+class SplitAssignerImpl implements SplitAssigner {
 
-    protected final StopCursor stopCursor;
-    protected final boolean enablePartitionDiscovery;
-    protected final SplitEnumeratorContext<PulsarPartitionSplit> context;
-    protected final Set<TopicPartition> appendedPartitions;
-    protected final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
+    private final StopCursor stopCursor;
+    private final boolean enablePartitionDiscovery;
+    private final SplitEnumeratorContext<PulsarPartitionSplit> context;
+    private final Set<TopicPartition> appendedPartitions;
+    private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
 
-    protected boolean initialized;
+    private boolean initialized;
 
-    protected SplitAssignerBase(
+    SplitAssignerImpl(
             StopCursor stopCursor,
             boolean enablePartitionDiscovery,
             SplitEnumeratorContext<PulsarPartitionSplit> context,
@@ -58,6 +58,37 @@ abstract class SplitAssignerBase implements SplitAssigner {
         this.initialized = false;
     }
 
+    @Override
+    public List<TopicPartition> registerTopicPartitions(Set<TopicPartition> fetchedPartitions) {
+        List<TopicPartition> newPartitions = new ArrayList<>();
+
+        for (TopicPartition partition : fetchedPartitions) {
+            if (!appendedPartitions.contains(partition)) {
+                appendedPartitions.add(partition);
+                newPartitions.add(partition);
+
+                // Calculate the reader id by the current parallelism.
+                int readerId = partitionOwner(partition);
+                PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor);
+                addSplitToPendingList(readerId, split);
+            }
+        }
+
+        if (!initialized) {
+            initialized = true;
+        }
+
+        return newPartitions;
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        for (PulsarPartitionSplit split : splits) {
+            int readerId = partitionOwner(split.getPartition());
+            addSplitToPendingList(readerId, split);
+        }
+    }
+
     @Override
     public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
             List<Integer> readers) {
@@ -100,7 +131,7 @@ abstract class SplitAssignerBase implements SplitAssigner {
     }
 
     /** Add split to pending lists. */
-    protected void addSplitToPendingList(int readerId, PulsarPartitionSplit split) {
+    private void addSplitToPendingList(int readerId, PulsarPartitionSplit split) {
         Set<PulsarPartitionSplit> splits =
                 pendingPartitionSplits.computeIfAbsent(readerId, i -> new HashSet<>());
         splits.add(split);
@@ -123,7 +154,7 @@ abstract class SplitAssignerBase implements SplitAssigner {
      * @param partition The Pulsar partition to assign.
      * @return The id of the reader that owns this partition.
      */
-    protected int partitionOwner(TopicPartition partition) {
+    private int partitionOwner(TopicPartition partition) {
         return calculatePartitionOwner(
                 partition.getTopic(), partition.getPartitionId(), context.currentParallelism());
     }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
index 0e11996..152657b 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
@@ -23,7 +23,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -57,35 +56,17 @@ public abstract class BasePulsarSubscriber implements PulsarSubscriber {
     }
 
     protected List<TopicPartition> toTopicPartitions(
-            TopicMetadata metadata, List<TopicRange> ranges, KeySharedMode mode) {
+            TopicMetadata metadata, List<TopicRange> ranges) {
         if (!metadata.isPartitioned()) {
             // For non-partitioned topic.
-            return toTopicPartitions(metadata.getName(), -1, ranges, mode);
+            return singletonList(new TopicPartition(metadata.getName(), -1, ranges));
         } else {
             // For partitioned topic.
             List<TopicPartition> partitions = new ArrayList<>();
             for (int i = 0; i < metadata.getPartitionSize(); i++) {
-                partitions.addAll(toTopicPartitions(metadata.getName(), i, ranges, mode));
+                partitions.add(new TopicPartition(metadata.getName(), i, ranges));
             }
             return partitions;
         }
     }
-
-    protected List<TopicPartition> toTopicPartitions(
-            String topic, int partitionId, List<TopicRange> ranges, KeySharedMode mode) {
-        switch (mode) {
-            case JOIN:
-                return singletonList(new TopicPartition(topic, partitionId, ranges, mode));
-            case SPLIT:
-                List<TopicPartition> partitions = new ArrayList<>(ranges.size());
-                for (TopicRange range : ranges) {
-                    TopicPartition partition =
-                            new TopicPartition(topic, partitionId, singletonList(range), mode);
-                    partitions.add(partition);
-                }
-                return partitions;
-            default:
-                throw new UnsupportedOperationException(mode + " isn't supported.");
-        }
-    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
index 127addd..1b87cc2 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
@@ -22,7 +22,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.naming.TopicName;
@@ -63,9 +62,8 @@ public class TopicListSubscriber extends BasePulsarSubscriber {
         for (String topic : fullTopicNames) {
             TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);
             List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
-            KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism);
 
-            results.addAll(toTopicPartitions(metadata, ranges, mode));
+            results.addAll(toTopicPartitions(metadata, ranges));
         }
 
         for (String partition : partitions) {
@@ -75,9 +73,8 @@ public class TopicListSubscriber extends BasePulsarSubscriber {
 
             TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, name);
             List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism);
-            KeySharedMode mode = rangeGenerator.keyShareMode(metadata, parallelism);
 
-            results.addAll(toTopicPartitions(name, index, ranges, mode));
+            results.add(new TopicPartition(name, index, ranges));
         }
 
         return results;
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
index 6173fca..146996e 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
@@ -22,7 +22,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -75,9 +74,7 @@ public class TopicPatternSubscriber extends BasePulsarSubscriber {
                             metadata -> {
                                 List<TopicRange> ranges =
                                         rangeGenerator.range(metadata, parallelism);
-                                KeySharedMode mode =
-                                        rangeGenerator.keyShareMode(metadata, parallelism);
-                                return toTopicPartitions(metadata, ranges, mode).stream();
+                                return toTopicPartitions(metadata, ranges).stream();
                             })
                     .collect(toSet());
         } catch (PulsarAdminException e) {
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
index b258510..b851cb2 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
@@ -36,7 +35,6 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.SPLIT;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -68,26 +66,14 @@ public class TopicPartition implements Serializable {
      */
     private final List<TopicRange> ranges;
 
-    /**
-     * The key share mode for the {@link SubscriptionType#Key_Shared}. It will be {@link
-     * KeySharedMode#JOIN} for other subscriptions.
-     */
-    private final KeySharedMode mode;
-
     public TopicPartition(String topic, int partitionId) {
-        this(topic, partitionId, FULL_RANGES, SPLIT);
+        this(topic, partitionId, FULL_RANGES);
     }
 
     public TopicPartition(String topic, int partitionId, List<TopicRange> ranges) {
-        this(topic, partitionId, ranges, SPLIT);
-    }
-
-    public TopicPartition(
-            String topic, int partitionId, List<TopicRange> ranges, KeySharedMode mode) {
         this.topic = topicName(checkNotNull(topic));
         this.partitionId = partitionId;
         this.ranges = checkNotNull(ranges);
-        this.mode = mode;
     }
 
     public String getTopic() {
@@ -122,12 +108,6 @@ public class TopicPartition implements Serializable {
         return ranges.stream().map(TopicRange::toPulsarRange).collect(toList());
     }
 
-    /** This method is internal used for key shared mode. */
-    @Internal
-    public KeySharedMode getMode() {
-        return mode;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -140,13 +120,12 @@ public class TopicPartition implements Serializable {
 
         return partitionId == partition.partitionId
                 && topic.equals(partition.topic)
-                && ranges.equals(partition.ranges)
-                && mode == partition.mode;
+                && ranges.equals(partition.ranges);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topic, partitionId, ranges, mode);
+        return Objects.hash(topic, partitionId, ranges);
     }
 
     @Override
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java
index e21147d..eee0811 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedKeysRangeGenerator.java
@@ -59,11 +59,9 @@ public class FixedKeysRangeGenerator implements RangeGenerator {
     private static final long serialVersionUID = 2372969466289052100L;
 
     private final List<TopicRange> ranges;
-    private final KeySharedMode sharedMode;
 
-    private FixedKeysRangeGenerator(List<TopicRange> ranges, KeySharedMode sharedMode) {
+    public FixedKeysRangeGenerator(List<TopicRange> ranges) {
         this.ranges = ranges;
-        this.sharedMode = sharedMode;
     }
 
     @Override
@@ -71,11 +69,6 @@ public class FixedKeysRangeGenerator implements RangeGenerator {
         return ranges;
     }
 
-    @Override
-    public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) {
-        return sharedMode;
-    }
-
     public static FixedKeysRangeGeneratorBuilder builder() {
         return new FixedKeysRangeGeneratorBuilder();
     }
@@ -85,7 +78,6 @@ public class FixedKeysRangeGenerator implements RangeGenerator {
     public static class FixedKeysRangeGeneratorBuilder {
 
         private final SortedSet<Integer> keyHashes = new TreeSet<>();
-        private KeySharedMode sharedMode = KeySharedMode.JOIN;
 
         private FixedKeysRangeGeneratorBuilder() {
             // No public for builder
@@ -138,12 +130,6 @@ public class FixedKeysRangeGenerator implements RangeGenerator {
             return this;
         }
 
-        /** Override the default {@link KeySharedMode#JOIN} to the mode your have provided. */
-        public FixedKeysRangeGeneratorBuilder keySharedMode(KeySharedMode sharedMode) {
-            this.sharedMode = sharedMode;
-            return this;
-        }
-
         /** Create the FixedKeysRangeGenerator by the given keys. */
         public FixedKeysRangeGenerator build() {
             List<TopicRange> ranges = new ArrayList<>();
@@ -178,8 +164,8 @@ public class FixedKeysRangeGenerator implements RangeGenerator {
                 ranges.add(range);
             }
 
-            validateTopicRanges(ranges, sharedMode);
-            return new FixedKeysRangeGenerator(ranges, sharedMode);
+            validateTopicRanges(ranges);
+            return new FixedKeysRangeGenerator(ranges);
         }
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
deleted file mode 100644
index 84c41e9..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-
-import java.util.List;
-
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
-
-/** Always return the same range set for all topics. */
-@PublicEvolving
-public class FixedRangeGenerator implements RangeGenerator {
-    private static final long serialVersionUID = -3895203007855538734L;
-
-    private final List<TopicRange> ranges;
-    private final KeySharedMode sharedMode;
-
-    public FixedRangeGenerator(List<TopicRange> ranges) {
-        this(ranges, KeySharedMode.JOIN);
-    }
-
-    public FixedRangeGenerator(List<TopicRange> ranges, KeySharedMode sharedMode) {
-        validateTopicRanges(ranges, sharedMode);
-
-        this.ranges = ranges;
-        this.sharedMode = sharedMode;
-    }
-
-    @Override
-    public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
-        return ranges;
-    }
-
-    @Override
-    public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) {
-        return sharedMode;
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java
index 1ac69e0..72b1262 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FullRangeGenerator.java
@@ -40,9 +40,4 @@ public class FullRangeGenerator implements RangeGenerator {
     public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
         return singletonList(TopicRange.createFullRange());
     }
-
-    @Override
-    public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) {
-        return KeySharedMode.SPLIT;
-    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java
index 8bfdcba..6f9621d 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/RangeGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
@@ -51,54 +50,8 @@ public interface RangeGenerator extends Serializable {
      */
     List<TopicRange> range(TopicMetadata metadata, int parallelism);
 
-    /**
-     * Defines the default behavior for Key_Shared subscription in Flink. See {@link KeySharedMode}
-     * for the detailed usage of the key share mode.
-     *
-     * @param metadata The metadata of the topic.
-     * @param parallelism The reader size for this topic.
-     */
-    default KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) {
-        return KeySharedMode.SPLIT;
-    }
-
     /** Initialize some extra resources when bootstrap the source. */
     default void open(SourceConfiguration sourceConfiguration) {
         // This method is used for user implementation.
-        open(sourceConfiguration, sourceConfiguration);
-    }
-
-    /** @deprecated Use {@link #open(SourceConfiguration)} instead. */
-    @Deprecated
-    default void open(Configuration configuration, SourceConfiguration sourceConfiguration) {
-        // This method is used for user implementation.
-    }
-
-    /**
-     * Different Key_Shared mode means different split assignment behaviors. If you only consume a
-     * subset of Pulsar's key hash range, remember to use the {@link KeySharedMode#JOIN} mode which
-     * will subscribe all the range in only one reader. Otherwise, when the ranges can join into a
-     * full Pulsar key hash range (0 ~ 65535) you should use {@link KeySharedMode#SPLIT} for sharing
-     * the splits among all the backend readers.
-     *
-     * <p>In the {@link KeySharedMode#SPLIT} mode. The topic will be subscribed by multiple readers.
-     * But Pulsar has one limit in this situation. That is if a Message can't find the corresponding
-     * reader by the key hash range. No messages will be delivered to the current readers, until
-     * there is a reader which can subscribe to such messages.
-     */
-    @PublicEvolving
-    enum KeySharedMode {
-
-        /**
-         * The topic ranges that the {@link RangeGenerator} generated will be split among the
-         * readers.
-         */
-        SPLIT,
-
-        /**
-         * Assign all the topic ranges to only one reader instance. This is used for partial key
-         * hash range subscription.
-         */
-        JOIN
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java
deleted file mode 100644
index 1765326..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGenerator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MAX_RANGE;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.MIN_RANGE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * This range generator would divide the range by the flink source parallelism. It would be the
- * default implementation for {@link SubscriptionType#Key_Shared} subscription.
- */
-@PublicEvolving
-public class SplitRangeGenerator implements RangeGenerator {
-    private static final long serialVersionUID = -8682286436352905249L;
-
-    private final int start;
-    private final int end;
-
-    public SplitRangeGenerator() {
-        this(MIN_RANGE, MAX_RANGE);
-    }
-
-    public SplitRangeGenerator(int start, int end) {
-        checkArgument(
-                start >= MIN_RANGE,
-                "Start range should be equal to or great than the min range " + MIN_RANGE);
-        checkArgument(
-                end <= MAX_RANGE, "End range should below or less than the max range " + MAX_RANGE);
-        checkArgument(start <= end, "Start range should be equal to or less than the end range");
-
-        this.start = start;
-        this.end = end;
-    }
-
-    @Override
-    public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
-        final int range = end - start + 1;
-        final int size = Math.min(range, parallelism);
-        int startRange = start;
-
-        List<TopicRange> results = new ArrayList<>(size);
-        for (int i = 1; i < size; i++) {
-            int nextStartRange = i * range / size + start;
-            results.add(new TopicRange(startRange, nextStartRange - 1));
-            startRange = nextStartRange;
-        }
-        results.add(new TopicRange(startRange, end));
-
-        return results;
-    }
-
-    @Override
-    public KeySharedMode keyShareMode(TopicMetadata metadata, int parallelism) {
-        return KeySharedMode.SPLIT;
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java
index 8728bde..28b9482 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessageBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
@@ -56,14 +55,12 @@ public final class TopicRangeUtils {
     }
 
     /** Make sure all the ranges should be valid in Pulsar Key Shared Policy. */
-    public static void validateTopicRanges(List<TopicRange> ranges, KeySharedMode sharedMode) {
+    public static void validateTopicRanges(List<TopicRange> ranges) {
         List<Range> pulsarRanges = ranges.stream().map(TopicRange::toPulsarRange).collect(toList());
         KeySharedPolicy.stickyHashRange().ranges(pulsarRanges).validate();
 
-        if (!isFullTopicRanges(ranges) && KeySharedMode.SPLIT == sharedMode) {
-            LOG.warn(
-                    "You have provided a partial key hash range with KeySharedMode.SPLIT. "
-                            + "You can't consume any message if there are any messages with keys that are out of the given ranges.");
+        if (isFullTopicRanges(ranges)) {
+            LOG.warn("You have provided a full key hash range.");
         }
     }
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
similarity index 74%
rename from flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
rename to flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
index fa36ee9..76888df 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java
@@ -16,8 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.split;
+package org.apache.flink.connector.pulsar.source.reader;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
@@ -36,26 +38,25 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.MSG_NUM_IN_RECEIVER_QUEUE;
@@ -75,24 +76,30 @@ import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL
 import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_MSGS_RECEIVED;
 import static org.apache.flink.connector.pulsar.common.metrics.MetricNames.TOTAL_RECEIVED_FAILED;
 import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
 import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.createConsumerBuilder;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
+import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
 import static org.apache.pulsar.client.api.KeySharedPolicy.stickyHashRange;
 
-/** The common partition split reader. */
-abstract class PulsarPartitionSplitReaderBase
+/**
+ * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
+ * PulsarSourceReader} is closed.
+ */
+@Internal
+public class PulsarPartitionSplitReader
         implements SplitReader<Message<byte[]>, PulsarPartitionSplit> {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReaderBase.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionSplitReader.class);
 
-    protected final PulsarClient pulsarClient;
-    protected final PulsarAdmin pulsarAdmin;
-    protected final SourceConfiguration sourceConfiguration;
-    protected final SourceReaderMetricGroup metricGroup;
+    private final PulsarClient pulsarClient;
+    @VisibleForTesting final PulsarAdmin pulsarAdmin;
+    @VisibleForTesting final SourceConfiguration sourceConfiguration;
+    private final SourceReaderMetricGroup metricGroup;
 
-    protected Consumer<byte[]> pulsarConsumer;
-    protected PulsarPartitionSplit registeredSplit;
+    private Consumer<byte[]> pulsarConsumer;
+    private PulsarPartitionSplit registeredSplit;
 
-    protected PulsarPartitionSplitReaderBase(
+    public PulsarPartitionSplitReader(
             PulsarClient pulsarClient,
             PulsarAdmin pulsarAdmin,
             SourceConfiguration sourceConfiguration,
@@ -132,22 +139,15 @@ abstract class PulsarPartitionSplitReaderBase
                 if (condition == StopCondition.CONTINUE || condition == StopCondition.EXACTLY) {
                     // Collect original message.
                     builder.add(splitId, message);
-                    // Acknowledge the message if you need.
-                    finishedPollMessage(message);
+                    LOG.debug("Finished polling message {}", message);
                 }
 
                 if (condition == StopCondition.EXACTLY || condition == StopCondition.TERMINATE) {
                     builder.addFinishedSplit(splitId);
                     break;
                 }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                break;
             } catch (TimeoutException e) {
                 break;
-            } catch (ExecutionException e) {
-                LOG.error("Error in polling message from pulsar consumer.", e);
-                break;
             } catch (Exception e) {
                 throw new IOException(e);
             }
@@ -180,15 +180,55 @@ abstract class PulsarPartitionSplitReaderBase
         // Open stop cursor.
         registeredSplit.open(pulsarAdmin);
 
-        // Before creating the consumer.
-        beforeCreatingConsumer(registeredSplit);
+        // Reset the start position before creating the consumer.
+        MessageId latestConsumedId = registeredSplit.getLatestConsumedId();
+
+        if (latestConsumedId != null) {
+            LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
+            try {
+                MessageId initialPosition;
+                if (latestConsumedId == MessageId.latest
+                        || latestConsumedId == MessageId.earliest) {
+                    // for compatibility
+                    initialPosition = latestConsumedId;
+                } else {
+                    initialPosition = nextMessageId(latestConsumedId);
+                }
+
+                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
+                // See https://github.com/apache/pulsar/issues/16757 for more details.
+
+                String topicName = registeredSplit.getPartition().getFullTopicName();
+                List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+                String subscriptionName = sourceConfiguration.getSubscriptionName();
+
+                if (!subscriptions.contains(subscriptionName)) {
+                    // If this subscription is not available. Just create it.
+                    pulsarAdmin
+                            .topics()
+                            .createSubscription(topicName, subscriptionName, initialPosition);
+                } else {
+                    // Reset the subscription if this is existed.
+                    pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
+                }
+            } catch (PulsarAdminException e) {
+                if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
+                    throw new IllegalArgumentException(e);
+                } else {
+                    // WARN_ON_MISMATCH would just print this warning message.
+                    // No need to print the stacktrace.
+                    LOG.warn(
+                            "Failed to reset cursor to {} on partition {}",
+                            latestConsumedId,
+                            registeredSplit.getPartition(),
+                            e);
+                }
+            }
+        }
 
         // Create pulsar consumer.
         this.pulsarConsumer = createPulsarConsumer(registeredSplit);
 
-        // After creating the consumer.
-        afterCreatingConsumer(registeredSplit, pulsarConsumer);
-
         LOG.info("Register split {} consumer for current reader.", registeredSplit);
     }
 
@@ -220,18 +260,16 @@ abstract class PulsarPartitionSplitReaderBase
         }
     }
 
-    @Nullable
-    protected abstract Message<byte[]> pollMessage(Duration timeout)
-            throws ExecutionException, InterruptedException, PulsarClientException;
-
-    protected abstract void finishedPollMessage(Message<byte[]> message);
-
-    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
-        // Nothing to do by default.
+    protected Message<byte[]> pollMessage(Duration timeout) throws PulsarClientException {
+        return pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
     }
 
-    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
-        // Nothing to do by default.
+    public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) {
+        if (pulsarConsumer == null) {
+            this.pulsarConsumer = createPulsarConsumer(partition);
+        }
+
+        sneakyClient(() -> pulsarConsumer.acknowledgeCumulative(offsetsToCommit));
     }
 
     // --------------------------- Helper Methods -----------------------------
@@ -248,19 +286,13 @@ abstract class PulsarPartitionSplitReaderBase
 
         consumerBuilder.topic(partition.getFullTopicName());
 
-        // Add KeySharedPolicy for Key_Shared subscription.
-        if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
+        // Add KeySharedPolicy for partial keys subscription.
+        if (!isFullTopicRanges(partition.getRanges())) {
             KeySharedPolicy policy = stickyHashRange().ranges(partition.getPulsarRanges());
             // We may enable out of order delivery for speeding up. It was turned off by default.
             policy.setAllowOutOfOrderDelivery(
                     sourceConfiguration.isAllowKeySharedOutOfOrderDelivery());
             consumerBuilder.keySharedPolicy(policy);
-
-            if (partition.getMode() == JOIN) {
-                // Override the key shared subscription into exclusive for making it behaviors like
-                // a Pulsar Reader which supports partial key hash ranges.
-                consumerBuilder.subscriptionType(SubscriptionType.Exclusive);
-            }
         }
 
         // Create the consumer configuration by using common utils.
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java
similarity index 87%
rename from flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
rename to flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java
index 5cac7de..b7ff3ff 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/emitter/PulsarRecordEmitter.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java
@@ -16,22 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.emitter;
+package org.apache.flink.connector.pulsar.source.reader;
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.flink.util.Collector;
 
 import org.apache.pulsar.client.api.Message;
 
 /**
- * The {@link RecordEmitter} implementation for both {@link PulsarOrderedSourceReader} and {@link
- * PulsarUnorderedSourceReader}. We would always update the last consumed message id in this
- * emitter.
+ * The {@link RecordEmitter} implementation for {@link PulsarSourceReader}. We would always update
+ * the last consumed message id in this emitter.
  */
 public class PulsarRecordEmitter<T>
         implements RecordEmitter<Message<byte[]>, T, PulsarPartitionSplitState> {
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java
similarity index 73%
rename from flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
rename to flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java
index af9bdd4..8d8e802 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarFetcherManagerBase.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceFetcherManager.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.fetcher;
+package org.apache.flink.connector.pulsar.source.reader;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
@@ -26,9 +26,14 @@ import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -37,10 +42,15 @@ import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
 
-/** Common fetcher manager abstraction for both ordered & unordered message. */
+/**
+ * Pulsar's FetcherManager implementation for ordered consuming. This class is needed to help
+ * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link
+ * PulsarPartitionSplitReader}.
+ */
 @Internal
-public abstract class PulsarFetcherManagerBase
+public class PulsarSourceFetcherManager
         extends SplitFetcherManager<Message<byte[]>, PulsarPartitionSplit> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceFetcherManager.class);
 
     private final Map<String, Integer> splitFetcherMapping = new HashMap<>();
     private final Map<Integer, Boolean> fetcherStatus = new HashMap<>();
@@ -53,7 +63,7 @@ public abstract class PulsarFetcherManagerBase
      *     the same queue instance that is also passed to the {@link SourceReaderBase}.
      * @param splitReaderSupplier The factory for the split reader that connects to the source
      */
-    protected PulsarFetcherManagerBase(
+    public PulsarSourceFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
             Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier,
             Configuration configuration) {
@@ -95,8 +105,27 @@ public abstract class PulsarFetcherManagerBase
         }
     }
 
-    protected SplitFetcher<Message<byte[]>, PulsarPartitionSplit> getOrCreateFetcher(
-            String splitId) {
+    public void acknowledgeMessages(Map<TopicPartition, MessageId> cursorsToCommit) {
+        LOG.debug("Acknowledge messages {}", cursorsToCommit);
+        cursorsToCommit.forEach(
+                (partition, messageId) -> {
+                    SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher =
+                            getOrCreateFetcher(partition.toString());
+                    triggerAcknowledge(fetcher, partition, messageId);
+                });
+    }
+
+    private void triggerAcknowledge(
+            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> splitFetcher,
+            TopicPartition partition,
+            MessageId messageId) {
+        PulsarPartitionSplitReader splitReader =
+                (PulsarPartitionSplitReader) splitFetcher.getSplitReader();
+        splitReader.notifyCheckpointComplete(partition, messageId);
+        startFetcher(splitFetcher);
+    }
+
+    private SplitFetcher<Message<byte[]>, PulsarPartitionSplit> getOrCreateFetcher(String splitId) {
         SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher;
         Integer fetcherId = splitFetcherMapping.get(splitId);
 
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
similarity index 63%
rename from flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
rename to flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
index 046575e..75d3491 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReader.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.source;
+package org.apache.flink.connector.pulsar.source.reader;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
@@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,39 +56,52 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+
 /**
  * The source reader for pulsar subscription Failover and Exclusive, which consumes the ordered
  * messages.
+ *
+ * @param <OUT> The output message type for flink.
  */
 @Internal
-public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT> {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedSourceReader.class);
+public class PulsarSourceReader<OUT>
+        extends SourceReaderBase<
+                Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
 
+    private final SourceConfiguration sourceConfiguration;
+    private final PulsarClient pulsarClient;
+    private final PulsarAdmin pulsarAdmin;
     @VisibleForTesting final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
     private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits;
-    private final AtomicReference<Throwable> cursorCommitThrowable = new AtomicReference<>();
+    private final AtomicReference<Throwable> cursorCommitThrowable;
+
     private ScheduledExecutorService cursorScheduler;
 
-    public PulsarOrderedSourceReader(
+    private PulsarSourceReader(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
-            Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier,
-            PulsarRecordEmitter<OUT> recordEmitter,
-            SourceReaderContext context,
+            PulsarSourceFetcherManager fetcherManager,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
             SourceConfiguration sourceConfiguration,
             PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin) {
+            PulsarAdmin pulsarAdmin,
+            SourceReaderContext context) {
         super(
                 elementsQueue,
-                new PulsarOrderedFetcherManager(
-                        elementsQueue, splitReaderSupplier::get, context.getConfiguration()),
-                recordEmitter,
-                context,
+                fetcherManager,
+                new PulsarRecordEmitter<>(deserializationSchema),
                 sourceConfiguration,
-                pulsarClient,
-                pulsarAdmin);
+                context);
+
+        this.sourceConfiguration = sourceConfiguration;
+        this.pulsarClient = pulsarClient;
+        this.pulsarAdmin = pulsarAdmin;
 
         this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
         this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
+        this.cursorCommitThrowable = new AtomicReference<>();
     }
 
     @Override
@@ -106,14 +121,20 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
 
     @Override
     public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
-        checkErrorAndRethrow();
+        Throwable cause = cursorCommitThrowable.get();
+        if (cause != null) {
+            throw new FlinkRuntimeException("An error occurred in acknowledge message.", cause);
+        }
+
         return super.pollNext(output);
     }
 
     @Override
     protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
         // Close all the finished splits.
-        closeFinishedSplits(finishedSplitIds.keySet());
+        for (String splitId : finishedSplitIds.keySet()) {
+            ((PulsarSourceFetcherManager) splitFetcherManager).closeFetcher(splitId);
+        }
 
         // We don't require new splits, all the splits are pre-assigned by source enumerator.
         if (LOG.isDebugEnabled()) {
@@ -129,6 +150,23 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
         }
     }
 
+    @Override
+    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) {
+        return new PulsarPartitionSplitState(split);
+    }
+
+    @Override
+    protected PulsarPartitionSplit toSplitType(
+            String splitId, PulsarPartitionSplitState splitState) {
+        return splitState.toPulsarPartitionSplit();
+    }
+
+    @Override
+    public void pauseOrResumeSplits(
+            Collection<String> splitsToPause, Collection<String> splitsToResume) {
+        splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
+    }
+
     @Override
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
         List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
@@ -154,7 +192,7 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
         LOG.debug("Committing cursors for checkpoint {}", checkpointId);
         Map<TopicPartition, MessageId> cursors = cursorsToCommit.get(checkpointId);
         try {
-            ((PulsarOrderedFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
+            ((PulsarSourceFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
             LOG.debug("Successfully acknowledge cursors for checkpoint {}", checkpointId);
 
             // Clean up the cursors.
@@ -172,18 +210,16 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
             cursorScheduler.shutdown();
         }
 
+        // Close the all the consumers.
         super.close();
+
+        // Close shared pulsar resources.
+        pulsarClient.shutdown();
+        pulsarAdmin.close();
     }
 
     // ----------------- helper methods --------------
 
-    private void checkErrorAndRethrow() {
-        Throwable cause = cursorCommitThrowable.get();
-        if (cause != null) {
-            throw new RuntimeException("An error occurred in acknowledge message.", cause);
-        }
-    }
-
     /** Acknowledge the pulsar topic partition cursor by the last consumed message id. */
     private void cumulativeAcknowledgmentMessage() {
         Map<TopicPartition, MessageId> cursors = new HashMap<>(cursorsOfFinishedSplits);
@@ -199,7 +235,7 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
         }
 
         try {
-            ((PulsarOrderedFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
+            ((PulsarSourceFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
             // Clean up the finish splits.
             cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
         } catch (Exception e) {
@@ -207,4 +243,41 @@ public class PulsarOrderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT>
             cursorCommitThrowable.compareAndSet(null, e);
         }
     }
+
+    /** Factory method for creating PulsarSourceReader. */
+    public static <OUT> PulsarSourceReader<OUT> create(
+            SourceConfiguration sourceConfiguration,
+            PulsarDeserializationSchema<OUT> deserializationSchema,
+            SourceReaderContext readerContext) {
+
+        // Create a message queue with the predefined source option.
+        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue =
+                new FutureCompletingBlockingQueue<>(queueCapacity);
+
+        PulsarClient pulsarClient = createClient(sourceConfiguration);
+        PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
+
+        // Create an ordered split reader supplier.
+        Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier =
+                () ->
+                        new PulsarPartitionSplitReader(
+                                pulsarClient,
+                                pulsarAdmin,
+                                sourceConfiguration,
+                                readerContext.metricGroup());
+
+        PulsarSourceFetcherManager fetcherManager =
+                new PulsarSourceFetcherManager(
+                        elementsQueue, splitReaderSupplier, readerContext.getConfiguration());
+
+        return new PulsarSourceReader<>(
+                elementsQueue,
+                fetcherManager,
+                deserializationSchema,
+                sourceConfiguration,
+                pulsarClient,
+                pulsarAdmin,
+                readerContext);
+    }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
deleted file mode 100644
index 5d37614..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderFactory.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-
-import java.util.function.Supplier;
-
-import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
-import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
-
-/**
- * This factory class is used for creating different types of source reader for different
- * subscription type.
- *
- * <ol>
- *   <li>Failover, Exclusive: We would create {@link PulsarOrderedSourceReader}.
- *   <li>Shared, Key_Shared: We would create {@link PulsarUnorderedSourceReader}.
- * </ol>
- */
-@Internal
-public final class PulsarSourceReaderFactory {
-
-    private PulsarSourceReaderFactory() {
-        // No public constructor.
-    }
-
-    @SuppressWarnings("java:S2095")
-    public static <OUT> SourceReader<OUT, PulsarPartitionSplit> create(
-            SourceReaderContext readerContext,
-            PulsarDeserializationSchema<OUT> deserializationSchema,
-            SourceConfiguration sourceConfiguration) {
-
-        PulsarClient pulsarClient = createClient(sourceConfiguration);
-        PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
-
-        // Create a message queue with the predefined source option.
-        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue =
-                new FutureCompletingBlockingQueue<>(queueCapacity);
-
-        PulsarRecordEmitter<OUT> recordEmitter = new PulsarRecordEmitter<>(deserializationSchema);
-
-        // Create different pulsar source reader by subscription type.
-        SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
-        if (subscriptionType == SubscriptionType.Failover
-                || subscriptionType == SubscriptionType.Exclusive) {
-            // Create an ordered split reader supplier.
-            Supplier<PulsarOrderedPartitionSplitReader> splitReaderSupplier =
-                    () ->
-                            new PulsarOrderedPartitionSplitReader(
-                                    pulsarClient,
-                                    pulsarAdmin,
-                                    sourceConfiguration,
-                                    readerContext.metricGroup());
-
-            return new PulsarOrderedSourceReader<>(
-                    elementsQueue,
-                    splitReaderSupplier,
-                    recordEmitter,
-                    readerContext,
-                    sourceConfiguration,
-                    pulsarClient,
-                    pulsarAdmin);
-        } else if (subscriptionType == SubscriptionType.Shared
-                || subscriptionType == SubscriptionType.Key_Shared) {
-            TransactionCoordinatorClient coordinatorClient =
-                    ((PulsarClientImpl) pulsarClient).getTcClient();
-            if (coordinatorClient == null
-                    && !sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
-                throw new IllegalStateException("Transaction is required but didn't enabled");
-            }
-
-            Supplier<PulsarUnorderedPartitionSplitReader> splitReaderSupplier =
-                    () ->
-                            new PulsarUnorderedPartitionSplitReader(
-                                    pulsarClient,
-                                    pulsarAdmin,
-                                    sourceConfiguration,
-                                    readerContext.metricGroup(),
-                                    coordinatorClient);
-
-            return new PulsarUnorderedSourceReader<>(
-                    elementsQueue,
-                    splitReaderSupplier,
-                    recordEmitter,
-                    readerContext,
-                    sourceConfiguration,
-                    pulsarClient,
-                    pulsarAdmin,
-                    coordinatorClient);
-        } else {
-            throw new UnsupportedOperationException(
-                    "This subscription type is not " + subscriptionType + " supported currently.");
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
index 4a116cd..a14f1d4 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
@@ -54,15 +54,7 @@ public interface PulsarDeserializationSchema<T> extends Serializable, ResultType
      * @param configuration The Pulsar related source configuration.
      */
     default void open(InitializationContext context, SourceConfiguration configuration)
-            throws Exception {
-        open(context);
-    }
-
-    /** @deprecated Use {{@link #open(InitializationContext, SourceConfiguration)}} instead. */
-    @Deprecated
-    default void open(InitializationContext context) throws Exception {
-        // Nothing to do here for the default implementation.
-    }
+            throws Exception {}
 
     /**
      * Deserializes the pulsar message. This message could be a raw byte message or some parsed
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
deleted file mode 100644
index 0bc7c98..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarOrderedFetcherManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.fetcher;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.function.Supplier;
-
-/**
- * Pulsar's FetcherManager implementation for ordered consuming. This class is needed to help
- * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link
- * PulsarOrderedPartitionSplitReader}.
- */
-@Internal
-public class PulsarOrderedFetcherManager extends PulsarFetcherManagerBase {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedFetcherManager.class);
-
-    public PulsarOrderedFetcherManager(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
-            Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier,
-            Configuration configuration) {
-        super(elementsQueue, splitReaderSupplier, configuration);
-    }
-
-    public void acknowledgeMessages(Map<TopicPartition, MessageId> cursorsToCommit) {
-        LOG.debug("Acknowledge messages {}", cursorsToCommit);
-        cursorsToCommit.forEach(
-                (partition, messageId) -> {
-                    SplitFetcher<Message<byte[]>, PulsarPartitionSplit> fetcher =
-                            getOrCreateFetcher(partition.toString());
-                    triggerAcknowledge(fetcher, partition, messageId);
-                });
-    }
-
-    private void triggerAcknowledge(
-            SplitFetcher<Message<byte[]>, PulsarPartitionSplit> splitFetcher,
-            TopicPartition partition,
-            MessageId messageId) {
-        PulsarOrderedPartitionSplitReader splitReader =
-                (PulsarOrderedPartitionSplitReader) splitFetcher.getSplitReader();
-        splitReader.notifyCheckpointComplete(partition, messageId);
-        startFetcher(splitFetcher);
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
deleted file mode 100644
index 33b811c..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.fetcher;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-import static java.util.stream.Collectors.toCollection;
-
-/**
- * Pulsar's FetcherManager implementation for unordered consuming. This class is needed to help
- * acknowledge the message to Pulsar using the {@link Consumer} inside the {@link
- * PulsarUnorderedPartitionSplitReader}.
- */
-@Internal
-public class PulsarUnorderedFetcherManager extends PulsarFetcherManagerBase {
-
-    public PulsarUnorderedFetcherManager(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
-            Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier,
-            Configuration configuration) {
-        super(elementsQueue, splitReaderSupplier, configuration);
-    }
-
-    public List<PulsarPartitionSplit> snapshotState() {
-        return fetchers.values().stream()
-                .map(SplitFetcher::getSplitReader)
-                .map(this::snapshotReader)
-                .filter(Optional::isPresent)
-                .map(Optional::get)
-                .collect(toCollection(() -> new ArrayList<>(fetchers.size())));
-    }
-
-    private Optional<PulsarPartitionSplit> snapshotReader(
-            SplitReader<Message<byte[]>, PulsarPartitionSplit> splitReader) {
-        return ((PulsarUnorderedPartitionSplitReader) splitReader)
-                .snapshotState()
-                .map(PulsarPartitionSplitState::toPulsarPartitionSplit);
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
deleted file mode 100644
index d7fe471..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.connector.base.source.reader.SourceReaderBase;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarFetcherManagerBase;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-
-import java.util.Collection;
-import java.util.Set;
-
-/**
- * The common pulsar source reader for both ordered & unordered message consuming.
- *
- * @param <OUT> The output message type for flink.
- */
-abstract class PulsarSourceReaderBase<OUT>
-        extends SourceReaderBase<
-                Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
-
-    protected final SourceConfiguration sourceConfiguration;
-    protected final PulsarClient pulsarClient;
-    protected final PulsarAdmin pulsarAdmin;
-
-    protected PulsarSourceReaderBase(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
-            PulsarFetcherManagerBase splitFetcherManager,
-            PulsarRecordEmitter<OUT> recordEmitter,
-            SourceReaderContext context,
-            SourceConfiguration sourceConfiguration,
-            PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin) {
-        super(elementsQueue, splitFetcherManager, recordEmitter, sourceConfiguration, context);
-
-        this.sourceConfiguration = sourceConfiguration;
-        this.pulsarClient = pulsarClient;
-        this.pulsarAdmin = pulsarAdmin;
-    }
-
-    @Override
-    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) {
-        return new PulsarPartitionSplitState(split);
-    }
-
-    @Override
-    protected PulsarPartitionSplit toSplitType(
-            String splitId, PulsarPartitionSplitState splitState) {
-        return splitState.toPulsarPartitionSplit();
-    }
-
-    @Override
-    public void pauseOrResumeSplits(
-            Collection<String> splitsToPause, Collection<String> splitsToResume) {
-        splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
-    }
-
-    @Override
-    public void close() throws Exception {
-        // Close the all the consumers first.
-        super.close();
-
-        // Close shared pulsar resources.
-        pulsarClient.shutdown();
-        pulsarAdmin.close();
-    }
-
-    protected void closeFinishedSplits(Set<String> finishedSplitIds) {
-        for (String splitId : finishedSplitIds) {
-            ((PulsarFetcherManagerBase) splitFetcherManager).closeFetcher(splitId);
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
deleted file mode 100644
index 41a9b28..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
-import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.function.Supplier;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * The source reader for pulsar subscription Shared and Key_Shared, which consumes the unordered
- * messages.
- */
-@Internal
-public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT> {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);
-
-    @Nullable private final TransactionCoordinatorClient coordinatorClient;
-    private final SortedMap<Long, List<TxnID>> transactionsToCommit;
-    private final List<TxnID> transactionsOfFinishedSplits;
-
-    private boolean started = false;
-
-    public PulsarUnorderedSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue,
-            Supplier<PulsarUnorderedPartitionSplitReader> splitReaderSupplier,
-            PulsarRecordEmitter<OUT> recordEmitter,
-            SourceReaderContext context,
-            SourceConfiguration sourceConfiguration,
-            PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin,
-            @Nullable TransactionCoordinatorClient coordinatorClient) {
-        super(
-                elementsQueue,
-                new PulsarUnorderedFetcherManager(
-                        elementsQueue, splitReaderSupplier::get, context.getConfiguration()),
-                recordEmitter,
-                context,
-                sourceConfiguration,
-                pulsarClient,
-                pulsarAdmin);
-
-        this.coordinatorClient = coordinatorClient;
-        this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
-        this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>());
-    }
-
-    @Override
-    public void start() {
-        this.started = true;
-        super.start();
-    }
-
-    @Override
-    public void addSplits(List<PulsarPartitionSplit> splits) {
-        if (started) {
-            // We only accept splits after this reader is started and registered to the pipeline.
-            // This would ignore the splits from the state.
-            super.addSplits(splits);
-        } else {
-            // Abort the pending transaction in this split.
-            for (PulsarPartitionSplit split : splits) {
-                LOG.info("Ignore the split {} saved in checkpoint.", split);
-
-                TxnID transactionId = split.getUncommittedTransactionId();
-                if (transactionId != null && coordinatorClient != null) {
-                    try {
-                        coordinatorClient.abort(transactionId);
-                    } catch (Exception e) {
-                        LOG.debug(
-                                "Error in aborting transaction {} from the checkpoint",
-                                transactionId,
-                                e);
-                    }
-                }
-            }
-        }
-    }
-
-    @Override
-    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
-        // Close all the finished splits.
-        closeFinishedSplits(finishedSplitIds.keySet());
-
-        // We don't require new splits, all the splits are pre-assigned by source enumerator.
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
-        }
-
-        if (coordinatorClient != null) {
-            // Commit the uncommitted transaction
-            for (Map.Entry<String, PulsarPartitionSplitState> entry : finishedSplitIds.entrySet()) {
-                PulsarPartitionSplitState state = entry.getValue();
-                TxnID uncommittedTransactionId = state.getUncommittedTransactionId();
-                if (uncommittedTransactionId != null) {
-                    transactionsOfFinishedSplits.add(uncommittedTransactionId);
-                }
-            }
-        }
-    }
-
-    @Override
-    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
-        LOG.debug("Trigger the new transaction for downstream readers.");
-        List<PulsarPartitionSplit> splits =
-                ((PulsarUnorderedFetcherManager) splitFetcherManager).snapshotState();
-
-        if (coordinatorClient != null) {
-            // Snapshot the transaction status and commit it after checkpoint finishing.
-            List<TxnID> txnIDs =
-                    transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>());
-            for (PulsarPartitionSplit split : splits) {
-                TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
-                if (uncommittedTransactionId != null) {
-                    txnIDs.add(uncommittedTransactionId);
-                }
-            }
-
-            // Add finished splits' transactions.
-            txnIDs.addAll(transactionsOfFinishedSplits);
-            // Purge the transactions.
-            transactionsOfFinishedSplits.clear();
-        }
-
-        return splits;
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        LOG.debug("Committing transactions for checkpoint {}", checkpointId);
-
-        if (coordinatorClient != null) {
-            List<Long> checkpointIds =
-                    transactionsToCommit.keySet().stream()
-                            .filter(id -> id <= checkpointId)
-                            .collect(toList());
-
-            for (Long id : checkpointIds) {
-                List<TxnID> transactions = transactionsToCommit.remove(id);
-                if (transactions != null) {
-                    for (TxnID transaction : transactions) {
-                        coordinatorClient.commit(transaction);
-                        transactionsOfFinishedSplits.remove(transaction);
-                    }
-                }
-            }
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        // Abort all the pending transactions.
-        if (coordinatorClient != null) {
-            for (List<TxnID> transactions : transactionsToCommit.values()) {
-                for (TxnID transaction : transactions) {
-                    try {
-                        coordinatorClient.abort(transaction);
-                    } catch (Exception e) {
-                        LOG.warn("Error in aborting transaction {}", transaction, e);
-                    }
-                }
-            }
-        }
-
-        // Close the pulsar client finally.
-        super.close();
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
deleted file mode 100644
index 8eb7b29..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-import static org.apache.flink.connector.pulsar.source.config.CursorVerification.FAIL_ON_MISMATCH;
-import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.nextMessageId;
-
-/**
- * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
- * PulsarOrderedSourceReader} is closed.
- */
-@Internal
-public class PulsarOrderedPartitionSplitReader extends PulsarPartitionSplitReaderBase {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);
-
-    public PulsarOrderedPartitionSplitReader(
-            PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration,
-            SourceReaderMetricGroup metricGroup) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup);
-    }
-
-    @Override
-    protected Message<byte[]> pollMessage(Duration timeout) throws PulsarClientException {
-        return pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    protected void finishedPollMessage(Message<byte[]> message) {
-        // Nothing to do here.
-        LOG.debug("Finished polling message {}", message);
-    }
-
-    @Override
-    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
-        MessageId latestConsumedId = split.getLatestConsumedId();
-
-        // Reset the start position for ordered pulsar consumer.
-        if (latestConsumedId != null) {
-            LOG.info("Reset subscription position by the checkpoint {}", latestConsumedId);
-            try {
-                MessageId initialPosition;
-                if (latestConsumedId == MessageId.latest
-                        || latestConsumedId == MessageId.earliest) {
-                    // for compatibility
-                    initialPosition = latestConsumedId;
-                } else {
-                    initialPosition = nextMessageId(latestConsumedId);
-                }
-
-                // Remove Consumer.seek() here for waiting for pulsar-client-all 2.12.0
-                // See https://github.com/apache/pulsar/issues/16757 for more details.
-
-                String topicName = split.getPartition().getFullTopicName();
-                List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
-                String subscriptionName = sourceConfiguration.getSubscriptionName();
-
-                if (!subscriptions.contains(subscriptionName)) {
-                    // If this subscription is not available. Just create it.
-                    pulsarAdmin
-                            .topics()
-                            .createSubscription(topicName, subscriptionName, initialPosition);
-                } else {
-                    // Reset the subscription if this is existed.
-                    pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
-                }
-            } catch (PulsarAdminException e) {
-                if (sourceConfiguration.getVerifyInitialOffsets() == FAIL_ON_MISMATCH) {
-                    throw new IllegalArgumentException(e);
-                } else {
-                    // WARN_ON_MISMATCH would just print this warning message.
-                    // No need to print the stacktrace.
-                    LOG.warn(
-                            "Failed to reset cursor to {} on partition {}",
-                            latestConsumedId,
-                            split.getPartition(),
-                            e);
-                }
-            }
-        }
-    }
-
-    public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) {
-        if (pulsarConsumer == null) {
-            this.pulsarConsumer = createPulsarConsumer(partition);
-        }
-
-        sneakyClient(() -> pulsarConsumer.acknowledgeCumulative(offsetsToCommit));
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
deleted file mode 100644
index 72e7c6f..0000000
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarUnorderedSourceReader;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
-import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
-
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
-import org.apache.pulsar.client.api.transaction.TxnID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.createTransaction;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils.unwrap;
-
-/**
- * The split reader a given {@link PulsarPartitionSplit}, it would be closed once the {@link
- * PulsarUnorderedSourceReader} is closed.
- */
-@Internal
-public class PulsarUnorderedPartitionSplitReader extends PulsarPartitionSplitReaderBase {
-    private static final Logger LOG =
-            LoggerFactory.getLogger(PulsarUnorderedPartitionSplitReader.class);
-
-    private final TransactionCoordinatorClient coordinatorClient;
-
-    @Nullable private Transaction uncommittedTransaction;
-
-    public PulsarUnorderedPartitionSplitReader(
-            PulsarClient pulsarClient,
-            PulsarAdmin pulsarAdmin,
-            SourceConfiguration sourceConfiguration,
-            SourceReaderMetricGroup metricGroup,
-            TransactionCoordinatorClient coordinatorClient) {
-        super(pulsarClient, pulsarAdmin, sourceConfiguration, metricGroup);
-
-        this.coordinatorClient = coordinatorClient;
-    }
-
-    @Override
-    protected Message<byte[]> pollMessage(Duration timeout)
-            throws ExecutionException, InterruptedException, PulsarClientException {
-        Message<byte[]> message =
-                pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
-
-        // Skip the message when receive timeout
-        if (message == null) {
-            return null;
-        }
-
-        if (!sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
-            if (uncommittedTransaction == null) {
-                // Create a transaction.
-                this.uncommittedTransaction = newTransaction();
-            }
-
-            try {
-                // Add this message into transaction.
-                pulsarConsumer
-                        .acknowledgeAsync(message.getMessageId(), uncommittedTransaction)
-                        .get();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw e;
-            }
-        }
-
-        return message;
-    }
-
-    @Override
-    protected void finishedPollMessage(Message<byte[]> message) {
-        if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
-            sneakyClient(() -> pulsarConsumer.acknowledge(message));
-        }
-    }
-
-    @Override
-    protected void afterCreatingConsumer(PulsarPartitionSplit split, Consumer<byte[]> consumer) {
-        TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
-
-        // Abort the uncommitted pulsar transaction.
-        if (uncommittedTransactionId != null) {
-            if (coordinatorClient != null) {
-                try {
-                    coordinatorClient.abort(uncommittedTransactionId);
-                } catch (TransactionCoordinatorClientException e) {
-                    TransactionCoordinatorClientException exception = unwrap(e);
-                    // The aborted transaction would return a not found exception.
-                    if (!(exception instanceof TransactionNotFoundException)) {
-                        LOG.error(
-                                "Failed to abort the uncommitted transaction {} when restart the reader",
-                                uncommittedTransactionId,
-                                e);
-                    }
-                }
-            }
-
-            // Redeliver unacknowledged messages because of the message is out of order.
-            consumer.redeliverUnacknowledgedMessages();
-        }
-    }
-
-    public Optional<PulsarPartitionSplitState> snapshotState() {
-        if (registeredSplit == null) {
-            return Optional.empty();
-        }
-
-        PulsarPartitionSplitState state = new PulsarPartitionSplitState(registeredSplit);
-
-        // Avoiding NP problem when Pulsar don't get the message before Flink checkpoint.
-        if (uncommittedTransaction != null) {
-            TxnID txnID = uncommittedTransaction.getTxnID();
-            this.uncommittedTransaction = newTransaction();
-            state.setUncommittedTransactionId(txnID);
-        }
-
-        return Optional.of(state);
-    }
-
-    private Transaction newTransaction() {
-        long timeoutMillis = sourceConfiguration.getTransactionTimeoutMillis();
-        return createTransaction(pulsarClient, timeoutMillis);
-    }
-}
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
index 458be40..3189c17 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplit.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.source.PulsarOrderedSourceReader;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.MessageId;
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java
index fb88d3d..2903bf9 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializer.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.pulsar.source.split;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import org.apache.pulsar.client.api.MessageId;
@@ -50,7 +49,7 @@ public class PulsarPartitionSplitSerializer
             new PulsarPartitionSplitSerializer();
 
     // This version should be bumped after modifying the PulsarPartitionSplit.
-    public static final int CURRENT_VERSION = 1;
+    public static final int CURRENT_VERSION = 2;
 
     private PulsarPartitionSplitSerializer() {
         // Singleton instance.
@@ -141,7 +140,7 @@ public class PulsarPartitionSplitSerializer
 
     public void serializeTopicPartition(DataOutputStream out, TopicPartition partition)
             throws IOException {
-        // VERSION 1 serialization
+        // VERSION 2 serialization
         out.writeUTF(partition.getTopic());
         out.writeInt(partition.getPartitionId());
         serializeList(
@@ -151,7 +150,6 @@ public class PulsarPartitionSplitSerializer
                     o.writeInt(r.getStart());
                     o.writeInt(r.getEnd());
                 });
-        out.writeInt(partition.getMode().ordinal());
     }
 
     public TopicPartition deserializeTopicPartition(int version, DataInputStream in)
@@ -159,16 +157,14 @@ public class PulsarPartitionSplitSerializer
         String topic = in.readUTF();
         int partitionId = in.readInt();
         List<TopicRange> ranges;
-        KeySharedMode keySharedMode;
         if (version == 0) {
             // VERSION 0 deserialization
             int start = in.readInt();
             int end = in.readInt();
             TopicRange range = new TopicRange(start, end);
             ranges = singletonList(range);
-            keySharedMode = KeySharedMode.SPLIT;
         } else {
-            // VERSION 1 deserialization
+            // VERSION 1/2 deserialization
             ranges =
                     deserializeList(
                             in,
@@ -177,9 +173,11 @@ public class PulsarPartitionSplitSerializer
                                 int end = i.readInt();
                                 return new TopicRange(start, end);
                             });
-            keySharedMode = KeySharedMode.values()[in.readInt()];
+            if (version == 1) {
+                in.readInt();
+            }
         }
 
-        return new TopicPartition(topic, partitionId, ranges, keySharedMode);
+        return new TopicPartition(topic, partitionId, ranges);
     }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
index 644523a..29d74c4 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilderTest.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.SplitRangeGenerator;
-
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.junit.jupiter.api.Test;
 
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
@@ -35,12 +31,8 @@ class PulsarSourceBuilderTest {
 
     @Test
     void someSetterMethodCouldOnlyBeCalledOnce() {
-        PulsarSourceBuilder<String> builder =
-                new PulsarSourceBuilder<String>()
-                        .setAdminUrl("admin-url")
-                        .setServiceUrl("service-url")
-                        .setSubscriptionName("set_subscription_name")
-                        .setSubscriptionType(SubscriptionType.Exclusive);
+        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
+        fillRequiredFields(builder);
         assertAll(
                 () ->
                         assertThrows(
@@ -53,58 +45,17 @@ class PulsarSourceBuilderTest {
                 () ->
                         assertThrows(
                                 IllegalArgumentException.class,
-                                () -> builder.setSubscriptionName("set_subscription_name2")),
-                () ->
-                        assertThrows(
-                                IllegalArgumentException.class,
-                                () -> builder.setSubscriptionType(SubscriptionType.Shared)));
+                                () -> builder.setSubscriptionName("set_subscription_name2")));
     }
 
     @Test
     void topicPatternAndListCouldChooseOnlyOne() {
         PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
-        builder.setTopics("a", "b", "c");
-
+        fillRequiredFields(builder);
         assertThatThrownBy(() -> builder.setTopicPattern("a-a-a"))
                 .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test
-    void rangeGeneratorRequiresKeyShared() {
-        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
-        builder.setSubscriptionType(SubscriptionType.Shared);
-        SplitRangeGenerator rangeGenerator = new SplitRangeGenerator();
-
-        assertThatThrownBy(() -> builder.setRangeGenerator(rangeGenerator))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    void subscriptionTypeShouldNotBeOverriddenBySetMethod() {
-        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
-        fillRequiredFields(builder);
-
-        Configuration config = new Configuration();
-        config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared);
-        builder.setConfig(config);
-
-        assertThatThrownBy(() -> builder.setSubscriptionType(SubscriptionType.Failover))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
-    @Test
-    void subscriptionTypeShouldNotBeOverriddenByConfiguration() {
-        PulsarSourceBuilder<String> builder = new PulsarSourceBuilder<>();
-        fillRequiredFields(builder);
-
-        builder.setSubscriptionType(SubscriptionType.Failover);
-
-        Configuration config = new Configuration();
-        config.set(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared);
-        assertThatThrownBy(() -> builder.setConfig(config))
-                .isInstanceOf(IllegalArgumentException.class);
-    }
-
     private void fillRequiredFields(PulsarSourceBuilder<String> builder) {
         builder.setAdminUrl("admin-url");
         builder.setServiceUrl("service-url");
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 7d819c9..8a90557 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.source.cases.SingleTopicConsumingContext;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
@@ -61,4 +62,8 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestContext
     PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
             new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new);
+
+    @TestContext
+    PulsarTestContextFactory<String, PartialKeysConsumingContext> partialKeys =
+            new PulsarTestContextFactory<>(pulsar, PartialKeysConsumingContext::new);
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
deleted file mode 100644
index dfb0631..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Tag;
-
-/**
- * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
- */
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<String> {
-
-    // Defines test environment on Flink MiniCluster
-    @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
-
-    // Defines pulsar running environment
-    @TestExternalSystem
-    PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.container());
-
-    @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
-    @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
-            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
index a584d67..dff0153 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java
@@ -44,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
 class PulsarSourceEnumStateSerializerTest {
 
     @Test
-    void version2SerializeAndDeserialize() throws Exception {
+    void version3SerializeAndDeserialize() throws Exception {
         Set<TopicPartition> partitions =
                 Sets.newHashSet(
                         new TopicPartition(
@@ -55,12 +55,42 @@ class PulsarSourceEnumStateSerializerTest {
         PulsarSourceEnumState state = new PulsarSourceEnumState(partitions);
 
         byte[] bytes = INSTANCE.serialize(state);
-        PulsarSourceEnumState state1 = INSTANCE.deserialize(2, bytes);
+        PulsarSourceEnumState state1 = INSTANCE.deserialize(3, bytes);
 
         assertEquals(state.getAppendedPartitions(), state1.getAppendedPartitions());
         assertNotSame(state, state1);
     }
 
+    @Test
+    void version2Deserialize() throws Exception {
+        // Serialize in version 2 logic.
+        DataOutputSerializer serializer = new DataOutputSerializer(4096);
+        serializer.writeInt(2);
+        serializer.writeUTF("topic44");
+        serializer.writeInt(2);
+        serializer.writeInt(1);
+        serializer.writeInt(122);
+        serializer.writeInt(12233);
+        serializer.writeInt(0);
+        serializer.writeUTF("topic33");
+        serializer.writeInt(2);
+        serializer.writeInt(1);
+        serializer.writeInt(1222);
+        serializer.writeInt(22233);
+        serializer.writeInt(0);
+        byte[] bytes = serializer.getSharedBuffer();
+
+        PulsarSourceEnumState state = INSTANCE.deserialize(2, bytes);
+        Set<TopicPartition> partitions = state.getAppendedPartitions();
+        Set<TopicPartition> expectedPartitions =
+                Sets.newHashSet(
+                        new TopicPartition("topic44", 2, singletonList(new TopicRange(122, 12233))),
+                        new TopicPartition(
+                                "topic33", 2, singletonList(new TopicRange(1222, 22233))));
+
+        assertEquals(partitions, expectedPartitions);
+    }
+
     @Test
     void version1Deserialize() throws Exception {
         // Serialize in version 1 logic.
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 702a567..f79357d 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -34,9 +34,7 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -50,7 +48,6 @@ import static java.util.stream.Collectors.toSet;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
 import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
 import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
@@ -68,20 +65,14 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     private static final boolean ENABLE_PERIODIC_PARTITION_DISCOVERY = true;
     private static final boolean DISABLE_PERIODIC_PARTITION_DISCOVERY = false;
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) throws Exception {
+    @Test
+    void startWithDiscoverPartitionsOnce() throws Exception {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                preexistingTopics,
-                                context,
-                                DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                                preexistingTopics, context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             // Start the enumerator and it should schedule a one time task to discover and assign
             // partitions.
@@ -93,20 +84,14 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void startWithPeriodicPartitionDiscovery(SubscriptionType subscriptionType) throws Exception {
+    @Test
+    void startWithPeriodicPartitionDiscovery() throws Exception {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                preexistingTopics,
-                                context,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                                preexistingTopics, context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             enumerator.start();
             assertThat(context.getOneTimeCallables()).isEmpty();
@@ -116,20 +101,14 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void discoverPartitionsTriggersAssignments(SubscriptionType subscriptionType) throws Throwable {
+    @Test
+    void discoverPartitionsTriggersAssignments() throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                preexistingTopics,
-                                context,
-                                DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                                preexistingTopics, context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
             enumerator.start();
 
@@ -141,15 +120,12 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
             // Run the partition discover callable and check the partition assignment.
             runOneTimePartitionDiscovery(context);
-            verifyAllReaderAssignments(subscriptionType, context, preexistingTopics, 1);
+            verifyAllReaderAssignments(context, preexistingTopics, 1);
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void discoverPartitionsPeriodically(SubscriptionType subscriptionType) throws Throwable {
+    @Test
+    void discoverPartitionsPeriodically() throws Throwable {
         String dynamicTopic = "topic3-" + randomAlphabetic(10);
         Set<String> preexistingTopics = setupPreexistingTopics();
         Set<String> topicsToSubscribe = new HashSet<>(preexistingTopics);
@@ -158,13 +134,9 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                topicsToSubscribe,
-                                context,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                                topicsToSubscribe, context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
-            testRegisterReadersForPreexistingTopics(
-                    subscriptionType, preexistingTopics, context, enumerator);
+            testRegisterReadersForPreexistingTopics(preexistingTopics, context, enumerator);
 
             // invoke partition discovery callable again and there should be no new assignments.
             runPeriodicPartitionDiscovery(context);
@@ -185,27 +157,20 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
                     break;
                 }
             }
-            verifyAllReaderAssignments(subscriptionType, context, topicsToSubscribe, 4);
+            verifyAllReaderAssignments(context, topicsToSubscribe, 4);
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void addSplitsBack(SubscriptionType subscriptionType) throws Throwable {
+    @Test
+    void addSplitsBack() throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                preexistingTopics,
-                                context,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+                                preexistingTopics, context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
 
-            testRegisterReadersForPreexistingTopics(
-                    subscriptionType, preexistingTopics, context, enumerator);
+            testRegisterReadersForPreexistingTopics(preexistingTopics, context, enumerator);
 
             // Simulate a reader failure.
             context.unregisterReader(READER0);
@@ -218,27 +183,20 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
             // Simulate a reader recovery.
             registerReader(context, enumerator, READER0);
-            verifyAllReaderAssignments(subscriptionType, context, preexistingTopics, 3 + 1);
+            verifyAllReaderAssignments(context, preexistingTopics, 3 + 1);
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void workWithPreexistingAssignments(SubscriptionType subscriptionType) throws Throwable {
+    @Test
+    void workWithPreexistingAssignments() throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         PulsarSourceEnumState preexistingAssignments;
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context1 =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
-                                preexistingTopics,
-                                context1,
-                                ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
-            testRegisterReadersForPreexistingTopics(
-                    subscriptionType, preexistingTopics, context1, enumerator);
+                                preexistingTopics, context1, ENABLE_PERIODIC_PARTITION_DISCOVERY)) {
+            testRegisterReadersForPreexistingTopics(preexistingTopics, context1, enumerator);
             preexistingAssignments =
                     asEnumState(context1.getSplitsAssignmentSequence().get(0).assignment());
         }
@@ -247,7 +205,6 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
                         createEnumerator(
-                                subscriptionType,
                                 preexistingTopics,
                                 context2,
                                 ENABLE_PERIODIC_PARTITION_DISCOVERY,
@@ -256,24 +213,17 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
             runPeriodicPartitionDiscovery(context2);
 
             registerReader(context2, enumerator, READER0);
-            if (subscriptionType == SubscriptionType.Shared) {
-                verifyAllReaderAssignments(subscriptionType, context2, preexistingTopics, 1);
-            } else {
-                assertThat(context2.getSplitsAssignmentSequence()).isEmpty();
-            }
+            assertThat(context2.getSplitsAssignmentSequence()).isEmpty();
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(
-            value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
-    void snapshotState(SubscriptionType subscriptionType) throws Throwable {
+    @Test
+    void snapshotState() throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                         new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
                 PulsarSourceEnumerator enumerator =
-                        createEnumerator(subscriptionType, preexistingTopics, context, false)) {
+                        createEnumerator(preexistingTopics, context, false)) {
             enumerator.start();
 
             // No reader is registered, so the state should be empty
@@ -302,7 +252,6 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     }
 
     private void testRegisterReadersForPreexistingTopics(
-            SubscriptionType subscriptionType,
             Set<String> topics,
             MockSplitEnumeratorContext<PulsarPartitionSplit> context,
             PulsarSourceEnumerator enumerator)
@@ -315,31 +264,22 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
         // Run the partition discover callable and check the partition assignment.
         runPeriodicPartitionDiscovery(context);
-        if (subscriptionType == SubscriptionType.Shared) {
-            verifyAllReaderAssignments(subscriptionType, context, topics, 1);
-        }
 
         registerReader(context, enumerator, READER1);
         registerReader(context, enumerator, READER2);
 
-        verifyAllReaderAssignments(subscriptionType, context, topics, 3);
+        verifyAllReaderAssignments(context, topics, 3);
     }
 
     private PulsarSourceEnumerator createEnumerator(
-            SubscriptionType subscriptionType,
             Set<String> topics,
             MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery) {
         return createEnumerator(
-                subscriptionType,
-                topics,
-                enumContext,
-                enablePeriodicPartitionDiscovery,
-                initialState());
+                topics, enumContext, enablePeriodicPartitionDiscovery, initialState());
     }
 
     private PulsarSourceEnumerator createEnumerator(
-            SubscriptionType subscriptionType,
             Set<String> topicsToSubscribe,
             MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext,
             boolean enablePeriodicPartitionDiscovery,
@@ -352,7 +292,6 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
                 getTopicPatternSubscriber(topicPattern, RegexSubscriptionMode.AllTopics);
 
         Configuration configuration = operator().config();
-        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
         configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         if (enablePeriodicPartitionDiscovery) {
             configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 60L);
@@ -379,7 +318,6 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     }
 
     private void verifyAllReaderAssignments(
-            SubscriptionType subscriptionType,
             MockSplitEnumeratorContext<PulsarPartitionSplit> context,
             Set<String> topics,
             int expectedAssignmentSeqSize) {
@@ -398,15 +336,8 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
 
         // Compare assigned partitions with desired partitions.
         Set<TopicPartition> expectedTopicPartitions = getExpectedTopicPartitions(topics);
-        if (subscriptionType == SubscriptionType.Failover) {
-            int actualSize = assignments.values().stream().mapToInt(Set::size).sum();
-            assertThat(actualSize).isEqualTo(expectedTopicPartitions.size());
-        } else if (subscriptionType == SubscriptionType.Shared) {
-            assignments
-                    .values()
-                    .forEach(
-                            (splits) -> assertThat(splits).hasSize(expectedTopicPartitions.size()));
-        }
+        int actualSize = assignments.values().stream().mapToInt(Set::size).sum();
+        assertThat(actualSize).isEqualTo(expectedTopicPartitions.size());
     }
 
     private Set<TopicPartition> getExpectedTopicPartitions(Set<String> topics) {
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
deleted file mode 100644
index bc2d669..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerBase.calculatePartitionOwner;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** Unit tests for {@link NonSharedSplitAssigner}. */
-class NonSharedSplitAssignerTest extends SplitAssignerTestBase {
-
-    @Test
-    void noMoreSplits() {
-        SplitAssigner assigner = splitAssigner(true, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner = splitAssigner(false, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        Set<TopicPartition> partitions = createPartitions("persistent://public/default/f", 8);
-        int owner = calculatePartitionOwner("persistent://public/default/f", 8, 4);
-
-        assigner.registerTopicPartitions(partitions);
-        assertFalse(assigner.noMoreSplits(owner));
-
-        assigner.createAssignment(singletonList(owner));
-        assertTrue(assigner.noMoreSplits(owner));
-    }
-
-    @Test
-    void partitionsAssignment() {
-        SplitAssigner assigner = splitAssigner(true, 4);
-        assigner.registerTopicPartitions(createPartitions("persistent://public/default/d", 4));
-        int owner = calculatePartitionOwner("persistent://public/default/d", 4, 4);
-        List<Integer> readers = Arrays.asList(owner, owner + 1);
-
-        // Assignment with initial states.
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
-                assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(1);
-
-        // Reassignment with the same readers.
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isNotPresent();
-
-        // Register new partition and assign.
-        assigner.registerTopicPartitions(createPartitions("persistent://public/default/e", 5));
-        assigner.registerTopicPartitions(createPartitions("persistent://public/default/f", 1));
-        assigner.registerTopicPartitions(createPartitions("persistent://public/default/g", 3));
-        assigner.registerTopicPartitions(createPartitions("persistent://public/default/h", 4));
-
-        Set<Integer> owners = new HashSet<>();
-        owners.add(calculatePartitionOwner("persistent://public/default/e", 5, 4));
-        owners.add(calculatePartitionOwner("persistent://public/default/f", 1, 4));
-        owners.add(calculatePartitionOwner("persistent://public/default/g", 3, 4));
-        owners.add(calculatePartitionOwner("persistent://public/default/h", 4, 4));
-        readers = new ArrayList<>(owners);
-
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(readers.size());
-
-        // Assign to new readers.
-        readers = Collections.singletonList(5);
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isNotPresent();
-    }
-
-    @Override
-    protected SplitAssigner createAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        return new NonSharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
deleted file mode 100644
index 68f73b5..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** Unit tests for {@link SharedSplitAssigner}. */
-class SharedSplitAssignerTest extends SplitAssignerTestBase {
-
-    @Test
-    void noMoreSplits() {
-        SplitAssigner assigner = splitAssigner(true, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner = splitAssigner(false, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.registerTopicPartitions(createPartitions("f", 8));
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.createAssignment(singletonList(1));
-        assertTrue(assigner.noMoreSplits(1));
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.createAssignment(singletonList(3));
-        assertTrue(assigner.noMoreSplits(3));
-    }
-
-    @Test
-    void partitionsAssignment() {
-        SplitAssigner assigner = splitAssigner(true, 8);
-        assigner.registerTopicPartitions(createPartitions("d", 4));
-        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
-
-        // Assignment with initial states.
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
-                assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(4);
-
-        // Reassignment with same readers.
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isNotPresent();
-
-        // Register new partition and assign.
-        assigner.registerTopicPartitions(createPartitions("e", 5));
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(4);
-
-        // Assign to new readers.
-        readers = Arrays.asList(0, 2, 4, 6);
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment())
-                .hasSize(4)
-                .allSatisfy((k, v) -> assertThat(v).hasSize(2));
-    }
-
-    @Test
-    void reassignSplitsAfterRestarting() {
-        SplitAssigner assigner = splitAssigner(true, 8);
-        Set<TopicPartition> partitions = createPartitions("d", 4);
-        assigner.registerTopicPartitions(partitions);
-        List<Integer> readers = Arrays.asList(0, 1, 2);
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
-                assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(3);
-
-        // Create a new split assigner with same state.
-        SplitAssigner assigner1 = splitAssigner(true, 8, partitions);
-        assigner1.registerTopicPartitions(partitions);
-        assignment = assigner1.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(3);
-    }
-
-    @Override
-    protected SplitAssigner createAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        return new SharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImplTest.java
similarity index 59%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImplTest.java
index 712ab50..b081ecb 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerImplTest.java
@@ -31,6 +31,9 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -39,11 +42,14 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
+import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerImpl.calculatePartitionOwner;
 import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Test utils for split assigners. */
-abstract class SplitAssignerTestBase extends TestLogger {
+/** Unit tests for {@link SplitAssignerImpl}. */
+class SplitAssignerImplTest extends TestLogger {
 
     private static final List<MockSplitEnumeratorContext<PulsarPartitionSplit>> enumeratorContexts =
             new ArrayList<>();
@@ -91,19 +97,92 @@ abstract class SplitAssignerTestBase extends TestLogger {
         assertThat(assignment).isNotPresent();
     }
 
-    protected Set<TopicPartition> createPartitions(String topic, int partitionId) {
+    @Test
+    void noMoreSplits() {
+        SplitAssigner assigner = splitAssigner(true, 4);
+        assertFalse(assigner.noMoreSplits(3));
+
+        assigner = splitAssigner(false, 4);
+        assertFalse(assigner.noMoreSplits(3));
+
+        Set<TopicPartition> partitions = createPartitions("persistent://public/default/f", 8);
+        int owner = calculatePartitionOwner("persistent://public/default/f", 8, 4);
+
+        assigner.registerTopicPartitions(partitions);
+        assertFalse(assigner.noMoreSplits(owner));
+
+        assigner.createAssignment(singletonList(owner));
+        assertTrue(assigner.noMoreSplits(owner));
+    }
+
+    @Test
+    void partitionsAssignment() {
+        SplitAssigner assigner = splitAssigner(true, 4);
+        assigner.registerTopicPartitions(createPartitions("persistent://public/default/d", 4));
+        int owner = calculatePartitionOwner("persistent://public/default/d", 4, 4);
+        List<Integer> readers = Arrays.asList(owner, owner + 1);
+
+        // Assignment with initial states.
+        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
+                assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(1);
+
+        // Reassignment with the same readers.
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+
+        // Register new partition and assign.
+        assigner.registerTopicPartitions(createPartitions("persistent://public/default/e", 5));
+        assigner.registerTopicPartitions(createPartitions("persistent://public/default/f", 1));
+        assigner.registerTopicPartitions(createPartitions("persistent://public/default/g", 3));
+        assigner.registerTopicPartitions(createPartitions("persistent://public/default/h", 4));
+
+        Set<Integer> owners = new HashSet<>();
+        owners.add(calculatePartitionOwner("persistent://public/default/e", 5, 4));
+        owners.add(calculatePartitionOwner("persistent://public/default/f", 1, 4));
+        owners.add(calculatePartitionOwner("persistent://public/default/g", 3, 4));
+        owners.add(calculatePartitionOwner("persistent://public/default/h", 4, 4));
+        readers = new ArrayList<>(owners);
+
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isPresent();
+        assertThat(assignment.get().assignment()).hasSize(readers.size());
+
+        // Assign to new readers.
+        readers = Collections.singletonList(5);
+        assignment = assigner.createAssignment(readers);
+        assertThat(assignment).isNotPresent();
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        for (MockSplitEnumeratorContext<PulsarPartitionSplit> context : enumeratorContexts) {
+            context.close();
+        }
+    }
+
+    private SplitAssigner createAssigner(
+            StopCursor stopCursor,
+            boolean enablePartitionDiscovery,
+            SplitEnumeratorContext<PulsarPartitionSplit> context,
+            PulsarSourceEnumState enumState) {
+        return new SplitAssignerImpl(stopCursor, enablePartitionDiscovery, context, enumState);
+    }
+
+    private Set<TopicPartition> createPartitions(String topic, int partitionId) {
         TopicPartition p1 = new TopicPartition(topic, partitionId);
         return singleton(p1);
     }
 
-    protected SplitAssigner splitAssigner(boolean discovery, int parallelism) {
+    private SplitAssigner splitAssigner(boolean discovery, int parallelism) {
         MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                 new MockSplitEnumeratorContext<>(parallelism);
         enumeratorContexts.add(context);
         return createAssigner(defaultStopCursor(), discovery, context, initialState());
     }
 
-    protected SplitAssigner splitAssigner(
+    private SplitAssigner splitAssigner(
             boolean discovery, int parallelism, Set<TopicPartition> partitions) {
         MockSplitEnumeratorContext<PulsarPartitionSplit> context =
                 new MockSplitEnumeratorContext<>(parallelism);
@@ -111,17 +190,4 @@ abstract class SplitAssignerTestBase extends TestLogger {
         return createAssigner(
                 defaultStopCursor(), discovery, context, new PulsarSourceEnumState(partitions));
     }
-
-    protected abstract SplitAssigner createAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState);
-
-    @AfterAll
-    static void afterAll() throws Exception {
-        for (MockSplitEnumeratorContext<PulsarPartitionSplit> context : enumeratorContexts) {
-            context.close();
-        }
-    }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index be099e7..cadf019 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 
@@ -53,8 +53,8 @@ class StopCursorTest extends PulsarTestSuiteBase {
         String topicName = randomAlphanumeric(5);
         operator().createTopic(topicName, 2);
 
-        PulsarOrderedPartitionSplitReader splitReader =
-                new PulsarOrderedPartitionSplitReader(
+        PulsarPartitionSplitReader splitReader =
+                new PulsarPartitionSplitReader(
                         operator().client(),
                         operator().admin(),
                         sourceConfig(),
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGeneratorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGeneratorTest.java
deleted file mode 100644
index ef182bf..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/SplitRangeGeneratorTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
-
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** The unit test for {@link SplitRangeGenerator}. */
-class SplitRangeGeneratorTest {
-
-    private static final TopicMetadata METADATA = new TopicMetadata("fake", 10);
-
-    @Test
-    void rangeWithParallelismOne() {
-        SplitRangeGenerator generator = new SplitRangeGenerator(6, 65534);
-        List<TopicRange> ranges = generator.range(METADATA, 1);
-        List<TopicRange> desired = Collections.singletonList(new TopicRange(6, 65534));
-
-        assertThat(ranges).hasSize(1).containsExactlyElementsOf(desired);
-    }
-
-    @Test
-    void rangeBelowTheParallelism() {
-        SplitRangeGenerator generator = new SplitRangeGenerator(3, 10);
-        List<TopicRange> ranges = generator.range(METADATA, 12);
-        List<TopicRange> desired =
-                Arrays.asList(
-                        new TopicRange(3, 3),
-                        new TopicRange(4, 4),
-                        new TopicRange(5, 5),
-                        new TopicRange(6, 6),
-                        new TopicRange(7, 7),
-                        new TopicRange(8, 8),
-                        new TopicRange(9, 9),
-                        new TopicRange(10, 10));
-
-        assertThat(ranges).hasSize(8).containsExactlyElementsOf(desired);
-    }
-
-    @Test
-    void rangeWasDivideWithLastBiggerSize() {
-        SplitRangeGenerator generator = new SplitRangeGenerator(0, 100);
-        List<TopicRange> ranges = generator.range(METADATA, 9);
-        List<TopicRange> desired =
-                Arrays.asList(
-                        new TopicRange(0, 10),
-                        new TopicRange(11, 21),
-                        new TopicRange(22, 32),
-                        new TopicRange(33, 43),
-                        new TopicRange(44, 55),
-                        new TopicRange(56, 66),
-                        new TopicRange(67, 77),
-                        new TopicRange(78, 88),
-                        new TopicRange(89, 100));
-
-        assertThat(ranges).hasSize(9).containsExactlyElementsOf(desired);
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtilsTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtilsTest.java
index f844aae..fd53770 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtilsTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/TopicRangeUtilsTest.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.isFullTopicRanges;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.validateTopicRanges;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -40,16 +39,16 @@ class TopicRangeUtilsTest {
     @Test
     void testValidateTopicRanges() {
         List<TopicRange> ranges1 = Arrays.asList(new TopicRange(1, 2), new TopicRange(2, 3));
-        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges1, JOIN));
+        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges1));
 
         List<TopicRange> ranges2 = Arrays.asList(new TopicRange(1, 14), new TopicRange(2, 5));
-        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges2, JOIN));
+        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges2));
 
         List<TopicRange> ranges3 = Arrays.asList(new TopicRange(1, 14), new TopicRange(5, 30));
-        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges3, JOIN));
+        assertThrows(IllegalArgumentException.class, () -> validateTopicRanges(ranges3));
 
         List<TopicRange> ranges4 = Arrays.asList(new TopicRange(1, 14), new TopicRange(15, 30));
-        assertDoesNotThrow(() -> validateTopicRanges(ranges4, JOIN));
+        assertDoesNotThrow(() -> validateTopicRanges(ranges4));
     }
 
     @Test
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
similarity index 62%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index 635b0a3..cfac1ad 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderTestBase.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.split;
+package org.apache.flink.connector.pulsar.source.reader;
 
-import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
@@ -27,33 +26,19 @@ import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
-import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Stream;
 
 import static java.time.Duration.ofSeconds;
 import static java.util.Collections.singletonList;
@@ -64,150 +49,22 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSA
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
-import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
-import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
-import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
 import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
 import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
 import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
 import static org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSourceReaderMetricGroup;
 import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test utils for split readers. */
-@ExtendWith({
-    TestOrderlinessExtension.class,
-    TestLoggerExtension.class,
-})
-abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
-
-    @RegisterExtension
-    PulsarSplitReaderInvocationContextProvider provider =
-            new PulsarSplitReaderInvocationContextProvider();
-
-    /** Default source config: max message 1, fetch timeout 1s. */
-    private SourceConfiguration sourceConfig() {
-        Configuration config = operator().config();
-        config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
-        config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
-        config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-
-        return new SourceConfiguration(config);
-    }
-
-    protected void handleSplit(
-            PulsarPartitionSplitReaderBase reader, String topicName, int partitionId) {
-        handleSplit(reader, topicName, partitionId, null);
-    }
-
-    protected void handleSplit(
-            PulsarPartitionSplitReaderBase reader,
-            String topicName,
-            int partitionId,
-            MessageId startPosition) {
-        TopicPartition partition = new TopicPartition(topicName, partitionId);
-        PulsarPartitionSplit split =
-                new PulsarPartitionSplit(partition, StopCursor.never(), startPosition, null);
-        SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
-        reader.handleSplitsChanges(addition);
-    }
-
-    private void seekStartPositionAndHandleSplit(
-            PulsarPartitionSplitReaderBase reader, String topicName, int partitionId) {
-        seekStartPositionAndHandleSplit(reader, topicName, partitionId, MessageId.latest);
-    }
-
-    private void seekStartPositionAndHandleSplit(
-            PulsarPartitionSplitReaderBase reader,
-            String topicName,
-            int partitionId,
-            MessageId startPosition) {
-        TopicPartition partition = new TopicPartition(topicName, partitionId);
-        PulsarPartitionSplit split =
-                new PulsarPartitionSplit(partition, StopCursor.never(), null, null);
-        SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
-
-        // Create the subscription and set the start position for this reader.
-        // Remember not to use Consumer.seek(startPosition)
-        SourceConfiguration sourceConfiguration = reader.sourceConfiguration;
-        PulsarAdmin pulsarAdmin = reader.pulsarAdmin;
-        String subscriptionName = sourceConfiguration.getSubscriptionName();
-        List<String> subscriptions =
-                sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
-        if (!subscriptions.contains(subscriptionName)) {
-            // If this subscription is not available. Just create it.
-            sneakyAdmin(
-                    () ->
-                            pulsarAdmin
-                                    .topics()
-                                    .createSubscription(
-                                            topicName, subscriptionName, startPosition));
-        } else {
-            // Reset the subscription if this is existed.
-            sneakyAdmin(
-                    () ->
-                            pulsarAdmin
-                                    .topics()
-                                    .resetCursor(topicName, subscriptionName, startPosition));
-        }
-
-        // Accept the split and start consuming.
-        reader.handleSplitsChanges(addition);
-    }
-
-    private <T> Message<byte[]> fetchedMessage(PulsarPartitionSplitReaderBase splitReader) {
-        return fetchedMessages(splitReader, 1, false).stream().findFirst().orElse(null);
-    }
-
-    protected <T> List<Message<byte[]>> fetchedMessages(
-            PulsarPartitionSplitReaderBase splitReader, int expectedCount, boolean verify) {
-        return fetchedMessages(
-                splitReader, expectedCount, verify, Boundedness.CONTINUOUS_UNBOUNDED);
-    }
-
-    private <T> List<Message<byte[]>> fetchedMessages(
-            PulsarPartitionSplitReaderBase splitReader,
-            int expectedCount,
-            boolean verify,
-            Boundedness boundedness) {
-        List<Message<byte[]>> messages = new ArrayList<>(expectedCount);
-        List<String> finishedSplits = new ArrayList<>();
-        for (int i = 0; i < 3; ) {
-            try {
-                RecordsWithSplitIds<Message<byte[]>> recordsBySplitIds = splitReader.fetch();
-                if (recordsBySplitIds.nextSplit() != null) {
-                    // Collect the records in this split.
-                    Message<byte[]> record;
-                    while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
-                        messages.add(record);
-                    }
-                    finishedSplits.addAll(recordsBySplitIds.finishedSplits());
-                } else {
-                    i++;
-                }
-            } catch (IOException e) {
-                i++;
-            }
-            sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-        if (verify) {
-            assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount);
-            if (boundedness == Boundedness.CONTINUOUS_UNBOUNDED) {
-                assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty();
-            } else {
-                assertThat(finishedSplits).as("Split should be marked as finished").hasSize(1);
-            }
-        }
-
-        return messages;
-    }
+/**
+ * Unit test for {@link org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader}.
+ */
+class PulsarPartitionSplitReaderTest extends PulsarTestSuiteBase {
 
-    @TestTemplate
-    void pollMessageAfterTimeout(PulsarPartitionSplitReaderBase splitReader)
-            throws InterruptedException, TimeoutException {
+    @Test
+    void pollMessageAfterTimeout() throws InterruptedException, TimeoutException {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
 
         // Add a split
@@ -231,47 +88,51 @@ abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
                 "Couldn't poll message from Pulsar.");
     }
 
-    @TestTemplate
-    void consumeMessageCreatedAfterHandleSplitChangesAndFetch(
-            PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void consumeMessageCreatedAfterHandleSplitChangesAndFetch() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         handleSplit(splitReader, topicName, 0, MessageId.latest);
         operator().sendMessage(topicNameWithPartition(topicName, 0), STRING, randomAlphabetic(10));
         fetchedMessages(splitReader, 1, true);
     }
 
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChanges(
-            PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChanges() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0);
         fetchedMessages(splitReader, 0, true);
     }
 
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition(
-            PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.earliest);
         fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
     }
 
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition(
-            PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.latest);
         fetchedMessages(splitReader, 0, true);
     }
 
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor(
-            PulsarPartitionSplitReaderBase splitReader) {
-
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCursor() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
         MessageIdImpl lastMessageId =
                 (MessageIdImpl)
@@ -294,25 +155,31 @@ abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
         fetchedMessages(splitReader, 2, true);
     }
 
-    @TestTemplate
-    void emptyTopic(PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void emptyTopic() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().createTopic(topicName, DEFAULT_PARTITIONS);
         seekStartPositionAndHandleSplit(splitReader, topicName, 0);
         fetchedMessages(splitReader, 0, true);
     }
 
-    @TestTemplate
-    void emptyTopicWithoutSeek(PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void emptyTopicWithoutSeek() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         String topicName = randomAlphabetic(10);
+
         operator().createTopic(topicName, DEFAULT_PARTITIONS);
         handleSplit(splitReader, topicName, 0);
         fetchedMessages(splitReader, 0, true);
     }
 
-    @TestTemplate
-    void wakeupSplitReaderShouldNotCauseException(PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void wakeupSplitReaderShouldNotCauseException() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         handleSplit(splitReader, "non-exist", 0);
+
         AtomicReference<Throwable> error = new AtomicReference<>();
         Thread t =
                 new Thread(
@@ -333,85 +200,179 @@ abstract class PulsarPartitionSplitReaderTestBase extends PulsarTestSuiteBase {
         assertThat(error.get()).isNull();
     }
 
-    @TestTemplate
-    void assignNoSplits(PulsarPartitionSplitReaderBase splitReader) {
+    @Test
+    void assignNoSplits() {
+        PulsarPartitionSplitReader splitReader = splitReader();
         assertThat(fetchedMessage(splitReader)).isNull();
     }
 
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek() {
+        PulsarPartitionSplitReader splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek() {
+        PulsarPartitionSplitReader splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0, MessageId.latest);
+        fetchedMessages(splitReader, 0, true);
+    }
+
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek() {
+        PulsarPartitionSplitReader splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        handleSplit(splitReader, topicName, 0, MessageId.earliest);
+        fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
+    }
+
+    @Test
+    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithoutSeek() {
+        PulsarPartitionSplitReader splitReader = splitReader();
+        String topicName = randomAlphabetic(10);
+
+        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        MessageIdImpl lastMessageId =
+                (MessageIdImpl)
+                        sneakyAdmin(
+                                () ->
+                                        operator()
+                                                .admin()
+                                                .topics()
+                                                .getLastMessageId(
+                                                        topicNameWithPartition(topicName, 0)));
+        // when recover, use exclusive startCursor
+        handleSplit(
+                splitReader,
+                topicName,
+                0,
+                new MessageIdImpl(
+                        lastMessageId.getLedgerId(),
+                        lastMessageId.getEntryId() - 1,
+                        lastMessageId.getPartitionIndex()));
+        fetchedMessages(splitReader, 1, true);
+    }
+
     /** Create a split reader with max message 1, fetch timeout 1s. */
-    private PulsarPartitionSplitReaderBase splitReader(SubscriptionType subscriptionType) {
-        if (subscriptionType == SubscriptionType.Failover) {
-            return new PulsarOrderedPartitionSplitReader(
-                    operator().client(),
-                    operator().admin(),
-                    sourceConfig(),
-                    createSourceReaderMetricGroup());
-        } else {
-            return new PulsarUnorderedPartitionSplitReader(
-                    operator().client(),
-                    operator().admin(),
-                    sourceConfig(),
-                    createSourceReaderMetricGroup(),
-                    null);
-        }
+    private PulsarPartitionSplitReader splitReader() {
+        return new PulsarPartitionSplitReader(
+                operator().client(),
+                operator().admin(),
+                sourceConfig(),
+                createSourceReaderMetricGroup());
     }
 
-    /** Context Provider for PulsarSplitReaderTestBase. */
-    public class PulsarSplitReaderInvocationContextProvider
-            implements TestTemplateInvocationContextProvider {
+    /** Default source config: max message 1, fetch timeout 1s. */
+    private SourceConfiguration sourceConfig() {
+        Configuration config = operator().config();
+        config.set(PULSAR_MAX_FETCH_RECORDS, 1);
+        config.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
+        config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
 
-        @Override
-        public boolean supportsTestTemplate(ExtensionContext context) {
-            return true;
-        }
+        return new SourceConfiguration(config);
+    }
 
-        @Override
-        public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
-                ExtensionContext context) {
-            SubscriptionType subscriptionType =
-                    (SubscriptionType)
-                            context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
-                                    .get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY);
-            return Stream.of(new PulsarSplitReaderInvocationContext(splitReader(subscriptionType)));
-        }
+    private void handleSplit(PulsarPartitionSplitReader reader, String topicName, int partitionId) {
+        handleSplit(reader, topicName, partitionId, null);
     }
 
-    /** Parameter resolver for Split Reader. */
-    public static class PulsarSplitReaderInvocationContext
-            implements TestTemplateInvocationContext {
+    private void handleSplit(
+            PulsarPartitionSplitReader reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
+        TopicPartition partition = new TopicPartition(topicName, partitionId);
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), startPosition, null);
+        SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
+        reader.handleSplitsChanges(addition);
+    }
 
-        private final PulsarPartitionSplitReaderBase splitReader;
+    private void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReader reader, String topicName, int partitionId) {
+        seekStartPositionAndHandleSplit(reader, topicName, partitionId, MessageId.latest);
+    }
 
-        public PulsarSplitReaderInvocationContext(PulsarPartitionSplitReaderBase splitReader) {
-            this.splitReader = checkNotNull(splitReader);
-        }
+    private void seekStartPositionAndHandleSplit(
+            PulsarPartitionSplitReader reader,
+            String topicName,
+            int partitionId,
+            MessageId startPosition) {
+        TopicPartition partition = new TopicPartition(topicName, partitionId);
+        PulsarPartitionSplit split =
+                new PulsarPartitionSplit(partition, StopCursor.never(), null, null);
+        SplitsAddition<PulsarPartitionSplit> addition = new SplitsAddition<>(singletonList(split));
 
-        @Override
-        public String getDisplayName(int invocationIndex) {
-            return splitReader.getClass().getSimpleName();
+        // Create the subscription and set the start position for this reader.
+        // Remember not to use Consumer.seek(startPosition)
+        SourceConfiguration sourceConfiguration = reader.sourceConfiguration;
+        PulsarAdmin pulsarAdmin = reader.pulsarAdmin;
+        String subscriptionName = sourceConfiguration.getSubscriptionName();
+        List<String> subscriptions =
+                sneakyAdmin(() -> pulsarAdmin.topics().getSubscriptions(topicName));
+        if (!subscriptions.contains(subscriptionName)) {
+            // If this subscription is not available. Just create it.
+            sneakyAdmin(
+                    () ->
+                            pulsarAdmin
+                                    .topics()
+                                    .createSubscription(
+                                            topicName, subscriptionName, startPosition));
+        } else {
+            // Reset the subscription if this is existed.
+            sneakyAdmin(
+                    () ->
+                            pulsarAdmin
+                                    .topics()
+                                    .resetCursor(topicName, subscriptionName, startPosition));
         }
 
-        @Override
-        public List<Extension> getAdditionalExtensions() {
-            return Collections.singletonList(
-                    new ParameterResolver() {
-                        @Override
-                        public boolean supportsParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            return isAssignableFromParameterContext(
-                                    PulsarPartitionSplitReaderBase.class, parameterContext);
-                        }
-
-                        @Override
-                        public Object resolveParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            return splitReader;
-                        }
-                    });
+        // Accept the split and start consuming.
+        reader.handleSplitsChanges(addition);
+    }
+
+    private Message<byte[]> fetchedMessage(PulsarPartitionSplitReader splitReader) {
+        return fetchedMessages(splitReader, 1, false).stream().findFirst().orElse(null);
+    }
+
+    private List<Message<byte[]>> fetchedMessages(
+            PulsarPartitionSplitReader splitReader, int expectedCount, boolean verify) {
+        List<Message<byte[]>> messages = new ArrayList<>(expectedCount);
+        List<String> finishedSplits = new ArrayList<>();
+        for (int i = 0; i < 3; ) {
+            try {
+                RecordsWithSplitIds<Message<byte[]>> recordsBySplitIds = splitReader.fetch();
+                if (recordsBySplitIds.nextSplit() != null) {
+                    // Collect the records in this split.
+                    Message<byte[]> record;
+                    while ((record = recordsBySplitIds.nextRecordFromSplit()) != null) {
+                        messages.add(record);
+                    }
+                    finishedSplits.addAll(recordsBySplitIds.finishedSplits());
+                } else {
+                    i++;
+                }
+            } catch (IOException e) {
+                i++;
+            }
+            sleepUninterruptibly(1, TimeUnit.SECONDS);
         }
+        if (verify) {
+            assertThat(messages).as("We should fetch the expected size").hasSize(expectedCount);
+            assertThat(finishedSplits).as("Split should not be marked as finished").isEmpty();
+        }
+
+        return messages;
     }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
similarity index 56%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index 4ec1912..b9fd6b2 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarOrderedSourceReaderTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -16,13 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source.reader.source;
+package org.apache.flink.connector.pulsar.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -31,18 +38,31 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.junit.jupiter.api.TestTemplate;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
 import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplits;
 import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS;
@@ -50,19 +70,52 @@ import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeO
 import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 
-class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
+/** Unit test for {@link PulsarSourceReader}. */
+class PulsarSourceReaderTest extends PulsarTestSuiteBase {
 
     private static final int MAX_EMPTY_POLLING_TIMES = 10;
 
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Failover;
+    @Test
+    void assignZeroSplitsCreatesZeroSubscription() throws Exception {
+        String topicName = topicName();
+        PulsarSourceReader<Integer> reader = sourceReader();
+
+        reader.snapshotState(100L);
+        reader.notifyCheckpointComplete(100L);
+        // Verify the committed offsets.
+        reader.close();
+        for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) {
+            verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName, i));
+        }
+    }
+
+    @Test
+    void assigningEmptySplits() throws Exception {
+        String topicName = topicName();
+        PulsarSourceReader<Integer> reader = sourceReader();
+
+        final PulsarPartitionSplit emptySplit =
+                createPartitionSplit(
+                        topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED, MessageId.latest);
+
+        reader.addSplits(singletonList(emptySplit));
+
+        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
+        InputStatus status = reader.pollNext(output);
+        assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+        reader.close();
+    }
+
+    @Test
+    void consumeMessagesAndCommitOffsets() throws Exception {
+        String topicName = topicName();
+        PulsarSourceReader<Integer> reader = sourceReader();
 
-    @TestTemplate
-    void consumeMessagesAndCommitOffsets(
-            PulsarSourceReaderBase<Integer> baseReader, Boundedness boundedness, String topicName)
-            throws Exception {
         // set up the partition
-        PulsarOrderedSourceReader<Integer> reader = (PulsarOrderedSourceReader<Integer>) baseReader;
         setupSourceReader(reader, topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED);
 
         // waiting for results
@@ -86,11 +139,11 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
                 NUM_RECORDS_PER_PARTITION, TopicNameUtils.topicNameWithPartition(topicName, 0));
     }
 
-    @TestTemplate
-    void offsetCommitOnCheckpointComplete(
-            PulsarSourceReaderBase<Integer> baseReader, Boundedness boundedness, String topicName)
-            throws Exception {
-        PulsarOrderedSourceReader<Integer> reader = (PulsarOrderedSourceReader<Integer>) baseReader;
+    @Test
+    void offsetCommitOnCheckpointComplete() throws Exception {
+        String topicName = topicName();
+        PulsarSourceReader<Integer> reader = sourceReader();
+
         // consume more than 1 partition
         reader.addSplits(
                 createPartitionSplits(
@@ -132,48 +185,79 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
         }
     }
 
-    @TestTemplate
+    @ParameterizedTest
+    @EnumSource(Boundedness.class)
     @Timeout(600)
-    void supportsPausingOrResumingSplits(
-            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
-            throws Exception {
-        final PulsarPartitionSplit split =
+    void supportsPausingOrResumingSplits(Boundedness boundedness) throws Exception {
+        String topicName = topicName();
+        PulsarSourceReader<Integer> reader = sourceReader();
+
+        PulsarPartitionSplit split =
                 createPartitionSplit(topicName, 0, boundedness, MessageId.earliest);
 
-        reader.addSplits(Collections.singletonList(split));
+        reader.addSplits(singletonList(split));
 
         TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
 
-        reader.pauseOrResumeSplits(
-                Collections.singletonList(split.splitId()), Collections.emptyList());
+        reader.pauseOrResumeSplits(singletonList(split.splitId()), emptyList());
 
         InputStatus status = reader.pollNext(output);
-        assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
+        assertEquals(InputStatus.NOTHING_AVAILABLE, status);
 
-        reader.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton(split.splitId()));
+        reader.pauseOrResumeSplits(emptyList(), Collections.singleton(split.splitId()));
 
         do {
             status = reader.pollNext(output);
             Thread.sleep(5);
         } while (status != InputStatus.MORE_AVAILABLE);
 
-        assertThat(status).isEqualTo(InputStatus.MORE_AVAILABLE);
+        assertEquals(InputStatus.MORE_AVAILABLE, status);
 
         reader.close();
     }
 
+    private String topicName() {
+        String topicName = randomAlphabetic(20);
+        Random random = new Random(System.currentTimeMillis());
+        operator().setupTopic(topicName, Schema.INT32, () -> random.nextInt(20));
+
+        return topicName;
+    }
+
+    private PulsarSourceReader<Integer> sourceReader() {
+        Configuration configuration = operator().config();
+
+        configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
+        configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
+
+        PulsarDeserializationSchema<Integer> deserializationSchema = pulsarSchema(Schema.INT32);
+        SourceReaderContext context = new TestingReaderContext();
+        try {
+            deserializationSchema.open(
+                    new PulsarDeserializationSchemaInitializationContext(context),
+                    mock(SourceConfiguration.class));
+        } catch (Exception e) {
+            fail("Error while opening deserializationSchema");
+        }
+
+        SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
+
+        return PulsarSourceReader.create(sourceConfiguration, deserializationSchema, context);
+    }
+
     private void setupSourceReader(
-            PulsarSourceReaderBase<Integer> reader,
+            PulsarSourceReader<Integer> reader,
             String topicName,
             int partitionId,
             Boundedness boundedness) {
         PulsarPartitionSplit split = createPartitionSplit(topicName, partitionId, boundedness);
-        reader.addSplits(Collections.singletonList(split));
+        reader.addSplits(singletonList(split));
         reader.notifyNoMoreSplits();
     }
 
     private void pollUntil(
-            PulsarSourceReaderBase<Integer> reader,
+            PulsarSourceReader<Integer> reader,
             ReaderOutput<Integer> output,
             Supplier<Boolean> condition,
             String errorMessage)
@@ -194,8 +278,7 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
     }
 
     private void verifyAllMessageAcknowledged(int expectedMessages, String partitionName)
-            throws PulsarAdminException, PulsarClientException {
-
+            throws PulsarClientException {
         Consumer<byte[]> consumer =
                 operator()
                         .client()
@@ -206,7 +289,13 @@ class PulsarOrderedSourceReaderTest extends PulsarSourceReaderTestBase {
                         .topic(partitionName)
                         .subscribe();
 
-        assertThat(((MessageIdImpl) consumer.getLastMessageId()).getEntryId())
-                .isEqualTo(expectedMessages - 1);
+        assertEquals(
+                expectedMessages - 1, ((MessageIdImpl) consumer.getLastMessageId()).getEntryId());
+    }
+
+    private void verifyNoSubscriptionCreated(String partitionName) throws PulsarAdminException {
+        Map<String, ? extends SubscriptionStats> subscriptionStats =
+                operator().admin().topics().getStats(partitionName, true, true).getSubscriptions();
+        assertThat(subscriptionStats).isEmpty();
     }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
deleted file mode 100644
index 3819ff6..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderTestBase.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
-import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
-import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
-import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
-import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.SubscriptionStats;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.Extension;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.ParameterContext;
-import org.junit.jupiter.api.extension.ParameterResolutionException;
-import org.junit.jupiter.api.extension.ParameterResolver;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
-import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.stream.Stream;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.createPartitionSplit;
-import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.isAssignableFromParameterContext;
-import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY;
-import static org.apache.flink.connector.pulsar.testutils.extension.TestOrderlinessExtension.PULSAR_TEST_RESOURCE_NAMESPACE;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
-import static org.mockito.Mockito.mock;
-
-@ExtendWith({
-    TestOrderlinessExtension.class,
-    TestLoggerExtension.class,
-})
-abstract class PulsarSourceReaderTestBase extends PulsarTestSuiteBase {
-
-    @RegisterExtension
-    PulsarSourceReaderInvocationContextProvider provider =
-            new PulsarSourceReaderInvocationContextProvider();
-
-    @BeforeEach
-    void beforeEach(String topicName) {
-        Random random = new Random(System.currentTimeMillis());
-        operator().setupTopic(topicName, Schema.INT32, () -> random.nextInt(20));
-    }
-
-    @TestTemplate
-    void assignZeroSplitsCreatesZeroSubscription(
-            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
-            throws Exception {
-        reader.snapshotState(100L);
-        reader.notifyCheckpointComplete(100L);
-        // Verify the committed offsets.
-        reader.close();
-        for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) {
-            verifyNoSubscriptionCreated(TopicNameUtils.topicNameWithPartition(topicName, i));
-        }
-    }
-
-    @TestTemplate
-    void assigningEmptySplits(
-            PulsarSourceReaderBase<Integer> reader, Boundedness boundedness, String topicName)
-            throws Exception {
-        final PulsarPartitionSplit emptySplit =
-                createPartitionSplit(
-                        topicName, 0, Boundedness.CONTINUOUS_UNBOUNDED, MessageId.latest);
-
-        reader.addSplits(Collections.singletonList(emptySplit));
-
-        TestingReaderOutput<Integer> output = new TestingReaderOutput<>();
-        InputStatus status = reader.pollNext(output);
-        assertThat(status).isEqualTo(InputStatus.NOTHING_AVAILABLE);
-        reader.close();
-    }
-
-    private void verifyNoSubscriptionCreated(String partitionName) throws PulsarAdminException {
-        Map<String, ? extends SubscriptionStats> subscriptionStats =
-                operator().admin().topics().getStats(partitionName, true, true).getSubscriptions();
-        assertThat(subscriptionStats).isEmpty();
-    }
-
-    private PulsarSourceReaderBase<Integer> sourceReader(
-            boolean autoAcknowledgementEnabled, SubscriptionType subscriptionType) {
-        Configuration configuration = operator().config();
-        configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        configuration.set(PULSAR_MAX_FETCH_TIME, 1000L);
-        configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
-        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
-        if (autoAcknowledgementEnabled
-                || configuration.get(PULSAR_SUBSCRIPTION_TYPE) == SubscriptionType.Shared) {
-            configuration.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
-        }
-        PulsarDeserializationSchema<Integer> deserializationSchema = pulsarSchema(Schema.INT32);
-        SourceReaderContext context = new TestingReaderContext();
-        try {
-            deserializationSchema.open(
-                    new PulsarDeserializationSchemaInitializationContext(context),
-                    mock(SourceConfiguration.class));
-        } catch (Exception e) {
-            fail("Error while opening deserializationSchema");
-        }
-
-        SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration);
-        return (PulsarSourceReaderBase<Integer>)
-                PulsarSourceReaderFactory.create(
-                        context, deserializationSchema, sourceConfiguration);
-    }
-
-    public class PulsarSourceReaderInvocationContextProvider
-            implements TestTemplateInvocationContextProvider {
-
-        @Override
-        public boolean supportsTestTemplate(ExtensionContext context) {
-            return true;
-        }
-
-        @Override
-        public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(
-                ExtensionContext context) {
-            SubscriptionType subscriptionType =
-                    (SubscriptionType)
-                            context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
-                                    .get(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY);
-            return Stream.of(
-                    new PulsarSourceReaderInvocationContext(
-                            sourceReader(true, subscriptionType), Boundedness.CONTINUOUS_UNBOUNDED),
-                    new PulsarSourceReaderInvocationContext(
-                            sourceReader(false, subscriptionType),
-                            Boundedness.CONTINUOUS_UNBOUNDED));
-        }
-    }
-
-    public static class PulsarSourceReaderInvocationContext
-            implements TestTemplateInvocationContext {
-
-        private final PulsarSourceReaderBase<?> sourceReader;
-        private final Boundedness boundedness;
-        private final String randomTopicName;
-
-        public PulsarSourceReaderInvocationContext(
-                PulsarSourceReaderBase<?> splitReader, Boundedness boundedness) {
-            this.sourceReader = checkNotNull(splitReader);
-            this.boundedness = checkNotNull(boundedness);
-            this.randomTopicName = randomAlphabetic(5);
-        }
-
-        @Override
-        public String getDisplayName(int invocationIndex) {
-            return "AutoAckEnabled: "
-                    + sourceReader.sourceConfiguration.isEnableAutoAcknowledgeMessage()
-                    + "  Boundedness: "
-                    + boundedness.toString();
-        }
-
-        @Override
-        public List<Extension> getAdditionalExtensions() {
-            return Arrays.asList(
-                    new ParameterResolver() {
-                        @Override
-                        public boolean supportsParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            return isAssignableFromParameterContext(
-                                            PulsarSourceReaderBase.class, parameterContext)
-                                    || isAssignableFromParameterContext(
-                                            Boundedness.class, parameterContext)
-                                    || isAssignableFromParameterContext(
-                                            String.class, parameterContext);
-                        }
-
-                        @Override
-                        public Object resolveParameter(
-                                ParameterContext parameterContext,
-                                ExtensionContext extensionContext)
-                                throws ParameterResolutionException {
-                            if (parameterContext
-                                    .getParameter()
-                                    .getType()
-                                    .equals(PulsarSourceReaderBase.class)) {
-                                return sourceReader;
-                            } else if (parameterContext
-                                    .getParameter()
-                                    .getType()
-                                    .equals(Boundedness.class)) {
-                                return boundedness;
-                            } else {
-                                return randomTopicName;
-                            }
-                        }
-                    });
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
deleted file mode 100644
index 4f7fdd3..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-class PulsarUnorderedSourceReaderTest extends PulsarSourceReaderTestBase {
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
deleted file mode 100644
index 4d93ce9..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReaderTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.junit.jupiter.api.TestTemplate;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
-import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.NUM_RECORDS_PER_PARTITION;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** Unit tests for {@link PulsarOrderedPartitionSplitReaderTest}. */
-class PulsarOrderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase {
-
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Failover;
-
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek(
-            PulsarPartitionSplitReaderBase splitReader) {
-        String topicName = randomAlphabetic(10);
-        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
-        handleSplit(splitReader, topicName, 0);
-        fetchedMessages(splitReader, 0, true);
-    }
-
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithoutSeek(
-            PulsarPartitionSplitReaderBase splitReader) {
-        String topicName = randomAlphabetic(10);
-        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
-        handleSplit(splitReader, topicName, 0, MessageId.latest);
-        fetchedMessages(splitReader, 0, true);
-    }
-
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWithoutSeek(
-            PulsarPartitionSplitReaderBase splitReader) {
-        String topicName = randomAlphabetic(10);
-        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
-        handleSplit(splitReader, topicName, 0, MessageId.earliest);
-        fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
-    }
-
-    @TestTemplate
-    void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithoutSeek(
-            PulsarPartitionSplitReaderBase splitReader) {
-        String topicName = randomAlphabetic(10);
-        operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
-        MessageIdImpl lastMessageId =
-                (MessageIdImpl)
-                        sneakyAdmin(
-                                () ->
-                                        operator()
-                                                .admin()
-                                                .topics()
-                                                .getLastMessageId(
-                                                        topicNameWithPartition(topicName, 0)));
-        // when recover, use exclusive startCursor
-        handleSplit(
-                splitReader,
-                topicName,
-                0,
-                new MessageIdImpl(
-                        lastMessageId.getLedgerId(),
-                        lastMessageId.getEntryId() - 1,
-                        lastMessageId.getPartitionIndex()));
-        fetchedMessages(splitReader, 1, true);
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
deleted file mode 100644
index 2cb3cb9..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** Unit tests for {@link PulsarUnorderedPartitionSplitReaderTest}. */
-class PulsarUnorderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase {
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializerTest.java
index 3d4ee88..acf7462 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializerTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/split/PulsarPartitionSplitSerializerTest.java
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.Test;
 
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
 import static org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer.INSTANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
@@ -39,23 +38,58 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
 class PulsarPartitionSplitSerializerTest {
 
     @Test
-    void version1SerializeAndDeserialize() throws Exception {
+    void version2SerializeAndDeserialize() throws Exception {
         PulsarPartitionSplit split =
                 new PulsarPartitionSplit(
                         new TopicPartition(
-                                randomAlphabetic(10),
-                                10,
-                                singletonList(new TopicRange(400, 5000)),
-                                JOIN),
+                                randomAlphabetic(10), 10, singletonList(new TopicRange(400, 5000))),
                         StopCursor.defaultStopCursor());
 
         byte[] bytes = INSTANCE.serialize(split);
-        PulsarPartitionSplit split1 = INSTANCE.deserialize(1, bytes);
+        PulsarPartitionSplit split1 = INSTANCE.deserialize(2, bytes);
 
         assertEquals(split, split1);
         assertNotSame(split, split1);
     }
 
+    @Test
+    void version1Deserialize() throws Exception {
+        DataOutputSerializer serializer = new DataOutputSerializer(4096);
+        serializer.writeUTF("topic44");
+        serializer.writeInt(2);
+        serializer.writeInt(1);
+        serializer.writeInt(122);
+        serializer.writeInt(12233);
+        serializer.writeInt(0);
+
+        byte[] stopCursorBytes = InstantiationUtil.serializeObject(StopCursor.latest());
+        serializer.writeInt(stopCursorBytes.length);
+        serializer.write(stopCursorBytes);
+
+        serializer.writeBoolean(true);
+        byte[] messageIdBytes = MessageId.latest.toByteArray();
+        serializer.writeInt(messageIdBytes.length);
+        serializer.write(messageIdBytes);
+
+        serializer.writeBoolean(true);
+        serializer.writeLong(1000);
+        serializer.writeLong(2000);
+
+        byte[] bytes = serializer.getSharedBuffer();
+
+        PulsarPartitionSplit split = INSTANCE.deserialize(1, bytes);
+
+        PulsarPartitionSplit expectedSplit =
+                new PulsarPartitionSplit(
+                        new TopicPartition("topic44", 2, singletonList(new TopicRange(122, 12233))),
+                        StopCursor.latest(),
+                        MessageId.latest,
+                        new TxnID(1000, 2000));
+
+        assertEquals(split, expectedSplit);
+        assertNotSame(split, expectedSplit);
+    }
+
     @Test
     void version0Deserialize() throws Exception {
         DataOutputSerializer serializer = new DataOutputSerializer(4096);
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java
deleted file mode 100644
index b1a8362..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/SubType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.extension;
-
-import org.apache.flink.annotation.Experimental;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/** Marks the field in test class defining {@link org.apache.pulsar.client.api.SubscriptionType}. */
-@Target(ElementType.FIELD)
-@Retention(RetentionPolicy.RUNTIME)
-@Experimental
-public @interface SubType {}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java
deleted file mode 100644
index 07c9287..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/extension/TestOrderlinessExtension.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.extension;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.platform.commons.support.AnnotationSupport;
-
-import java.lang.annotation.Annotation;
-import java.util.Collection;
-import java.util.List;
-
-/** An extension for subclasses to specify {@link org.apache.pulsar.client.api.SubscriptionType}. */
-public class TestOrderlinessExtension implements BeforeAllCallback {
-
-    public static final ExtensionContext.Namespace PULSAR_TEST_RESOURCE_NAMESPACE =
-            ExtensionContext.Namespace.create("pulsarTestResourceNamespace");
-    public static final String PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY =
-            "pulsarSourceReaderSubscriptionTypeStoreKey";
-
-    private SubscriptionType subscriptionType;
-
-    @Override
-    public void beforeAll(ExtensionContext context) throws Exception {
-        final List<SubscriptionType> subscriptionTypes =
-                AnnotationSupport.findAnnotatedFieldValues(
-                        context.getRequiredTestInstance(), SubType.class, SubscriptionType.class);
-        checkExactlyOneAnnotatedField(subscriptionTypes, SubType.class);
-        subscriptionType = subscriptionTypes.get(0);
-        context.getStore(PULSAR_TEST_RESOURCE_NAMESPACE)
-                .put(PULSAR_SOURCE_READER_SUBSCRIPTION_TYPE_STORE_KEY, subscriptionType);
-    }
-
-    private void checkExactlyOneAnnotatedField(
-            Collection<?> fields, Class<? extends Annotation> annotation) {
-        if (fields.size() > 1) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Multiple fields are annotated with '@%s'",
-                            annotation.getSimpleName()));
-        }
-        if (fields.isEmpty()) {
-            throw new IllegalStateException(
-                    String.format(
-                            "No fields are annotated with '@%s'", annotation.getSimpleName()));
-        }
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index 8089f8c..cb2fc91 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -32,7 +32,6 @@ import org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.List;
 import java.util.Random;
@@ -66,7 +65,6 @@ public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
                         .setServiceUrl(operator.serviceUrl())
                         .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(topicPattern(), AllTopics)
-                        .setSubscriptionType(subscriptionType())
                         .setSubscriptionName(subscriptionName())
                         .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, DISCOVERY_INTERVAL);
 
@@ -127,9 +125,6 @@ public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
     /** The subscription name used in Pulsar consumer. */
     protected abstract String subscriptionName();
 
-    /** The subscription type used in Pulsar consumer. */
-    protected abstract SubscriptionType subscriptionType();
-
     /**
      * Dynamic generate a partition related topic in Pulsar. This topic should be pre-created in
      * Pulsar. Everytime we call this method, we may get a new partition name.
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index af41d8c..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source;
-
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Disabled;
-
-import java.util.List;
-
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
-import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-
-/**
- * Pulsar with {@link SubscriptionType#Key_Shared} and {@link SubscriptionType#Shared} consumes the
- * message out of order. So we have to override the default connector test tool.
- */
-public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBase<T> {
-
-    @Override
-    protected void checkResultWithSemantic(
-            CloseableIterator<T> resultIterator,
-            List<List<T>> testData,
-            CheckpointingMode semantic,
-            Integer limit) {
-        Runnable runnable =
-                () ->
-                        assertUnordered(resultIterator)
-                                .withNumRecordsLimit(getExpectedSize(testData, limit))
-                                .matchesRecordsFromSource(testData, semantic);
-
-        assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
-    }
-
-    /**
-     * Shared subscription will have multiple readers on same partition, this would make hard to
-     * automatically stop like a bounded source.
-     */
-    private static <T> int getExpectedSize(List<List<T>> testData, Integer limit) {
-        if (limit == null) {
-            return testData.stream().mapToInt(List::size).sum();
-        } else {
-            return limit;
-        }
-    }
-
-    @Override
-    @Disabled("We don't have any idle readers in Pulsar's shared subscription.")
-    public void testIdleReader(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<T> externalContext,
-            CheckpointingMode semantic) {}
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
index 422c954..d5dc3fb 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
@@ -21,8 +21,6 @@ package org.apache.flink.connector.pulsar.testutils.source.cases;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.source.PulsarSourceTestContext;
 
-import org.apache.pulsar.client.api.SubscriptionType;
-
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 
@@ -55,11 +53,6 @@ public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
         return "flink-multiple-topic-test";
     }
 
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Exclusive;
-    }
-
     @Override
     protected String generatePartitionName() {
         String topic = topicPrefix + index;
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
similarity index 84%
rename from flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
rename to flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
index c348853..c5dc8b6 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/PartialKeysConsumingContext.java
@@ -25,20 +25,17 @@ import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDa
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 
-import org.apache.pulsar.client.api.SubscriptionType;
-
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
 
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
+/** We would consume from test splits by using partial keys consuming. */
+public class PartialKeysConsumingContext extends MultipleTopicConsumingContext {
 
     private final String keyToRead;
     private final String keyToExclude;
 
-    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+    public PartialKeysConsumingContext(PulsarTestEnvironment environment) {
         super(environment);
 
         this.keyToRead = randomAlphabetic(8);
@@ -68,7 +65,7 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
     protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
         // Make sure we only consume the messages with keyToRead.
         FixedKeysRangeGenerator generator =
-                FixedKeysRangeGenerator.builder().key(keyToRead).keySharedMode(JOIN).build();
+                FixedKeysRangeGenerator.builder().key(keyToRead).build();
         builder.setRangeGenerator(generator);
         builder.setConfig(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY, true);
     }
@@ -77,9 +74,4 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
     protected String subscriptionName() {
         return "pulsar-key-shared-subscription";
     }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Key_Shared;
-    }
 }
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
deleted file mode 100644
index cd649fe..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source.cases;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
-
-    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Shared";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-shared-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Shared;
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java
index df51669..469eeeb 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java
@@ -21,8 +21,6 @@ package org.apache.flink.connector.pulsar.testutils.source.cases;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.source.PulsarSourceTestContext;
 
-import org.apache.pulsar.client.api.SubscriptionType;
-
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 
@@ -55,11 +53,6 @@ public class SingleTopicConsumingContext extends PulsarSourceTestContext {
         return "pulsar-single-topic-test";
     }
 
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Exclusive;
-    }
-
     @Override
     protected String generatePartitionName() {
         if (index == 0) {