You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 04:29:20 UTC

[flink] 03/06: [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 908405fc8354856dc741350eadb71e2a892830a9
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Sep 27 18:04:24 2022 +0800

    [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.
---
 .../pulsar/source/enumerator/topic/TopicRange.java |   2 +-
 .../pulsar/source/PulsarSourceITCase.java          |   6 +-
 .../pulsar/source/PulsarUnorderedSourceITCase.java |  60 ++-------
 .../pulsar/testutils/PulsarTestCommonUtils.java    |  17 ++-
 .../pulsar/testutils/PulsarTestContext.java        |  45 +++----
 .../pulsar/testutils/PulsarTestContextFactory.java |   5 +-
 .../pulsar/testutils/PulsarTestEnvironment.java    |   2 +-
 .../pulsar/testutils/PulsarTestSuiteBase.java      |   5 +-
 .../cases/MultipleTopicTemplateContext.java        | 134 -------------------
 .../cases/SharedSubscriptionConsumingContext.java  |  58 ---------
 .../cases/SingleTopicConsumingContext.java         | 132 -------------------
 .../{ => source}/PulsarPartitionDataWriter.java    |  18 ++-
 .../testutils/source/PulsarSourceTestContext.java  | 138 ++++++++++++++++++++
 .../source/UnorderedSourceTestSuiteBase.java}      |  40 ++----
 .../cases/MultipleTopicConsumingContext.java       |  42 ++++--
 .../source/cases/SingleTopicConsumingContext.java  |  73 +++++++++++
 .../flink-end-to-end-tests-pulsar/pom.xml          |  24 +++-
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |   5 +-
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  13 +-
 .../pulsar/cases/KeySharedSubscriptionContext.java | 144 ---------------------
 .../pulsar/cases/SharedSubscriptionContext.java    | 116 -----------------
 .../FlinkContainerWithPulsarEnvironment.java       |   3 +-
 .../common/KeyedPulsarPartitionDataWriter.java     |  19 +--
 .../common/UnorderedSourceTestSuiteBase.java       |  86 ------------
 .../ExclusiveSubscriptionContext.java              |  19 +--
 .../FailoverSubscriptionContext.java               |  19 +--
 .../source/KeySharedSubscriptionContext.java       |  87 +++++++++++++
 .../pulsar/source/SharedSubscriptionContext.java   |  32 ++---
 .../source/enumerator/NoOpEnumStateSerializer.java |   2 +-
 29 files changed, 466 insertions(+), 880 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
index 5b779224b87..1508b8732a3 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
@@ -68,7 +68,7 @@ public class TopicRange implements Serializable {
         return new Range(start, end);
     }
 
-    /** Create a topic range which contains the fully hash range. */
+    /** Create a topic range which contains the full hash range. */
     public static TopicRange createFullRange() {
         return new TopicRange(MIN_RANGE, MAX_RANGE);
     }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 54fc4d71dd5..c2c95f255b5 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.connector.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingContext;
-import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.SingleTopicConsumingContext;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
@@ -41,6 +41,7 @@ import org.junit.experimental.categories.Category;
  */
 @Category(value = {FailsOnJava11.class})
 class PulsarSourceITCase extends SourceTestSuiteBase<String> {
+
     // Defines test environment on Flink MiniCluster
     @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
@@ -48,6 +49,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestExternalSystem
     PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
 
+    // This field is preserved, we don't support the semantics in source currently.
     @TestSemantics
     CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
index 6bad7220886..d18c6c2633b 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
@@ -20,36 +20,28 @@ package org.apache.flink.connector.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.SharedSubscriptionConsumingContext;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.testutils.junit.FailsOnJava11;
-import org.apache.flink.util.CloseableIterator;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.junit.experimental.categories.Category;
-import org.junit.jupiter.api.Disabled;
 
-import java.util.List;
-
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
-import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
 
 /**
  * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
  */
 @Category(value = {FailsOnJava11.class})
-public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
+public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<String> {
+
     // Defines test environment on Flink MiniCluster
     @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
@@ -61,43 +53,7 @@ public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
     CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
 
     @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionConsumingContext> singleTopic =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionConsumingContext::new);
-
-    @Override
-    protected void checkResultWithSemantic(
-            CloseableIterator<String> resultIterator,
-            List<List<String>> testData,
-            CheckpointingMode semantic,
-            Integer limit) {
-        Runnable runnable =
-                () ->
-                        assertUnordered(resultIterator)
-                                .withNumRecordsLimit(getExpectedSize(testData, limit))
-                                .matchesRecordsFromSource(testData, semantic);
-
-        assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
-    }
-
-    /**
-     * Shared subscription will have multiple readers on same partition, this would make hard to
-     * automatically stop like a bounded source.
-     */
-    private static int getExpectedSize(List<List<String>> testData, Integer limit) {
-        if (limit == null) {
-            return testData.stream().mapToInt(List::size).sum();
-        } else {
-            return limit;
-        }
-    }
-
-    @Override
-    @Disabled("We don't have any idle readers in Pulsar's shared subscription.")
-    public void testIdleReader(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testIdleReader(testEnv, externalContext, semantic);
-    }
+    PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
+            new PulsarTestContextFactory<>(
+                    pulsar, env -> new MultipleTopicConsumingContext(env, Shared));
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
index 87f3976c6a7..96b1ca62f2a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
@@ -19,10 +19,12 @@
 package org.apache.flink.connector.pulsar.testutils;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.junit.jupiter.api.extension.ParameterContext;
@@ -33,7 +35,18 @@ import java.util.List;
 /** Put static methods that can be used by multiple test classes. */
 public class PulsarTestCommonUtils {
 
-    // ------- CreateSplits
+    /** Convert the CheckpointingMode to a connector related DeliveryGuarantee. */
+    public static DeliveryGuarantee toDeliveryGuarantee(CheckpointingMode checkpointingMode) {
+        if (checkpointingMode == CheckpointingMode.AT_LEAST_ONCE) {
+            return DeliveryGuarantee.AT_LEAST_ONCE;
+        } else if (checkpointingMode == CheckpointingMode.EXACTLY_ONCE) {
+            return DeliveryGuarantee.EXACTLY_ONCE;
+        } else {
+            throw new IllegalArgumentException(
+                    "Only exactly-once and al-least-once checkpointing mode are supported.");
+        }
+    }
+
     /** creates a fullRange() partitionSplit. */
     public static PulsarPartitionSplit createPartitionSplit(String topic, int partitionId) {
         return createPartitionSplit(topic, partitionId, Boundedness.CONTINUOUS_UNBOUNDED);
@@ -63,8 +76,6 @@ public class PulsarTestCommonUtils {
         return splits;
     }
 
-    // -------- InvocationContext Utils
-
     public static boolean isAssignableFromParameterContext(
             Class<?> requiredType, ParameterContext context) {
         return requiredType.isAssignableFrom(context.getParameter().getType());
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index 9eaa24041c7..a4e44441bfd 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -19,41 +19,30 @@
 package org.apache.flink.connector.pulsar.testutils;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.external.ExternalContext;
+
+import org.apache.pulsar.client.api.Schema;
 
 import java.net.URL;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Random;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 
-/** Common test context for pulsar based test. */
-public abstract class PulsarTestContext<T> implements DataStreamSourceExternalContext<T> {
+/**
+ * The implementation for Flink connector test tools. Providing the common test case writing
+ * constraint for both source, sink and table API.
+ */
+public abstract class PulsarTestContext<T> implements ExternalContext {
 
     protected final PulsarRuntimeOperator operator;
-    protected final List<URL> connectorJarPaths;
+    // The schema used for consuming and producing messages between Pulsar and tests.
+    protected final Schema<T> schema;
 
-    protected PulsarTestContext(PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+    protected PulsarTestContext(PulsarTestEnvironment environment, Schema<T> schema) {
         this.operator = environment.operator();
-        this.connectorJarPaths = connectorJarPaths;
-    }
-
-    // Helper methods for generating data.
-
-    protected List<String> generateStringTestData(int splitIndex, long seed) {
-        Random random = new Random(seed);
-        int recordNum = 300 + random.nextInt(200);
-        List<String> records = new ArrayList<>(recordNum);
-
-        for (int i = 0; i < recordNum; i++) {
-            int length = random.nextInt(40) + 10;
-            records.add(splitIndex + "-" + i + "-" + randomAlphanumeric(length));
-        }
-
-        return records;
+        this.schema = schema;
     }
 
+    /** Implement this method for providing a more friendly test name in IDE. */
     protected abstract String displayName();
 
     @Override
@@ -63,6 +52,10 @@ public abstract class PulsarTestContext<T> implements DataStreamSourceExternalCo
 
     @Override
     public List<URL> getConnectorJarPaths() {
-        return connectorJarPaths;
+        // We don't need any tests jar definition. They are provided in docker-related environments.
+        return Collections.emptyList();
     }
+
+    @Override
+    public void close() throws Exception {}
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContextFactory.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContextFactory.java
index c634efffc44..100c2a67d43 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContextFactory.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContextFactory.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.connector.pulsar.testutils;
 
+import org.apache.flink.connector.pulsar.testutils.source.PulsarSourceTestContext;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
 
 import java.util.function.Function;
 
 /**
- * Factory for creating all the test context that extends {@link PulsarTestContext}. Test context
- * class should have a constructor with {@link PulsarTestEnvironment} arg.
+ * Factory for creating all the test context that extends {@link PulsarSourceTestContext}. Test
+ * context class should have a constructor with single {@link PulsarTestEnvironment} arg.
  */
 public class PulsarTestContextFactory<F, T extends PulsarTestContext<F>>
         implements ExternalContextFactory<T> {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
index 0f3fb9e15f9..f921e4b92f1 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
@@ -48,7 +48,7 @@ import java.util.List;
  * }</pre>
  *
  * <p>If you want to use this class in JUnit 5, just simply extends {@link PulsarTestSuiteBase}, all
- * the helper methods in {@code PulsarContainerOperator} is also exposed there.
+ * the helper methods in {@link PulsarRuntimeOperator} is also exposed there.
  */
 public class PulsarTestEnvironment
         implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index b55fdc553b0..6f64ec008b3 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -30,9 +30,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
  * The base class for the all Pulsar related test sites. It brings up:
  *
  * <ul>
- *   <li>A Zookeeper cluster.
- *   <li>Pulsar Broker.
- *   <li>A Bookkeeper cluster.
+ *   <li>A Pulsar Broker with memory based local metastore.
+ *   <li>A standalone Bookkeeper with memory based local metastore.
  * </ul>
  *
  * <p>You just need to write a JUnit 5 test class and extends this suite class. All the helper
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
deleted file mode 100644
index 3eca9e7f926..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/**
- * Pulsar external context template that will create multiple topics with only one partitions as
- * source splits.
- */
-public abstract class MultipleTopicTemplateContext extends PulsarTestContext<String> {
-
-    private int numTopics = 0;
-
-    private final String topicPattern = "pulsar-multiple-topic-[0-9]+-" + randomAlphabetic(8);
-
-    private final Map<String, ExternalSystemSplitDataWriter<String>> topicNameToSplitWriters =
-            new HashMap<>();
-
-    public MultipleTopicTemplateContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public MultipleTopicTemplateContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(serviceUrl())
-                        .setAdminUrl(adminUrl())
-                        .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(subscriptionType())
-                        .setSubscriptionName(subscriptionName());
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            // This is required for SourceTestSuiteBase.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
-        operator.createTopic(topicName, 1);
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
-
-        topicNameToSplitWriters.put(partitionName, writer);
-        numTopics++;
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() throws Exception {
-        for (ExternalSystemSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
-            writer.close();
-        }
-
-        topicNameToSplitWriters.clear();
-    }
-
-    protected abstract String subscriptionName();
-
-    protected abstract SubscriptionType subscriptionType();
-
-    protected String serviceUrl() {
-        return operator.serviceUrl();
-    }
-
-    protected String adminUrl() {
-        return operator.adminUrl();
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java
deleted file mode 100644
index 8001b6e7300..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.cases;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * A consuming context with {@link SubscriptionType#Shared}, it's almost the same as {@link
- * MultipleTopicConsumingContext}.
- */
-public class SharedSubscriptionConsumingContext extends MultipleTopicTemplateContext {
-
-    public SharedSubscriptionConsumingContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public SharedSubscriptionConsumingContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message with shared subscription";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "flink-shared-subscription-test";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Shared;
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
deleted file mode 100644
index f5bfa45f32b..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
-
-/**
- * A Pulsar external context that will create only one topic and use partitions in that topic as
- * source splits.
- */
-public class SingleTopicConsumingContext extends PulsarTestContext<String> {
-
-    private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
-    private final String topicName;
-    private final Map<Integer, ExternalSystemSplitDataWriter<String>> partitionToSplitWriter =
-            new HashMap<>();
-
-    private int numSplits = 0;
-
-    public SingleTopicConsumingContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public SingleTopicConsumingContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-        this.topicName =
-                TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message on single topic";
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopics(topicName)
-                        .setSubscriptionType(Exclusive)
-                        .setSubscriptionName("pulsar-single-topic");
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            // This is required for SourceTestSuiteBase.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        if (numSplits == 0) {
-            // Create the topic first.
-            operator.createTopic(topicName, 1);
-            numSplits++;
-        } else {
-            numSplits++;
-            operator.increaseTopicPartitions(topicName, numSplits);
-        }
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, numSplits - 1);
-        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
-        partitionToSplitWriter.put(numSplits - 1, writer);
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() throws Exception {
-        // Close writer.
-        for (ExternalSystemSplitDataWriter<String> writer : partitionToSplitWriter.values()) {
-            writer.close();
-        }
-
-        partitionToSplitWriter.clear();
-    }
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java
similarity index 69%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java
index da48e26cd25..1ceb2922bc3 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarPartitionDataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils;
+package org.apache.flink.connector.pulsar.testutils.source;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
@@ -25,20 +25,26 @@ import org.apache.pulsar.client.api.Schema;
 
 import java.util.List;
 
-/** Source split data writer for writing test data into a Pulsar topic partition. */
-public class PulsarPartitionDataWriter implements ExternalSystemSplitDataWriter<String> {
+/**
+ * Source split data writer for writing test data into a Pulsar topic partition. This writer doesn't
+ * need to be closed.
+ */
+public class PulsarPartitionDataWriter<T> implements ExternalSystemSplitDataWriter<T> {
 
     private final PulsarRuntimeOperator operator;
     private final String fullTopicName;
+    private final Schema<T> schema;
 
-    public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String fullTopicName) {
+    public PulsarPartitionDataWriter(
+            PulsarRuntimeOperator operator, String fullTopicName, Schema<T> schema) {
         this.operator = operator;
         this.fullTopicName = fullTopicName;
+        this.schema = schema;
     }
 
     @Override
-    public void writeRecords(List<String> records) {
-        operator.sendMessages(fullTopicName, Schema.STRING, records);
+    public void writeRecords(List<T> records) {
+        operator.sendMessages(fullTopicName, schema, records);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
new file mode 100644
index 00000000000..8089f8c58e7
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
+
+/**
+ * Common source test context for pulsar based test. We use the string text as the basic send
+ * content.
+ */
+public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
+        implements DataStreamSourceExternalContext<String> {
+
+    private static final long DISCOVERY_INTERVAL = 1000L;
+    private static final int BATCH_DATA_SIZE = 300;
+
+    protected PulsarSourceTestContext(PulsarTestEnvironment environment) {
+        super(environment, Schema.STRING);
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
+        PulsarSourceBuilder<String> builder =
+                PulsarSource.builder()
+                        .setDeserializationSchema(pulsarSchema(schema))
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
+                        .setTopicPattern(topicPattern(), AllTopics)
+                        .setSubscriptionType(subscriptionType())
+                        .setSubscriptionName(subscriptionName())
+                        .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, DISCOVERY_INTERVAL);
+
+        // Set extra configuration for source builder.
+        setSourceBuilder(builder);
+
+        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
+            // Using the latest stop cursor for making sure the source could be stopped.
+            // This is required for SourceTestSuiteBase.
+            builder.setBoundedStopCursor(StopCursor.latest());
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
+        String partitionName = generatePartitionName();
+        return new PulsarPartitionDataWriter<>(operator, partitionName, schema);
+    }
+
+    @Override
+    public List<String> generateTestData(
+            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
+        Random random = new Random(seed);
+        return IntStream.range(0, BATCH_DATA_SIZE)
+                .boxed()
+                .map(
+                        index -> {
+                            int length = random.nextInt(20) + 1;
+                            return "split:"
+                                    + splitIndex
+                                    + "-index:"
+                                    + index
+                                    + "-content:"
+                                    + randomAlphanumeric(length);
+                        })
+                .collect(toList());
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return Types.STRING;
+    }
+
+    /** Override this method for creating builder. */
+    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
+        // Nothing to do by default.
+    }
+
+    /**
+     * The topic pattern which is used in Pulsar topic auto discovery. It was discovered every
+     * {@link #DISCOVERY_INTERVAL} ms;
+     */
+    protected abstract String topicPattern();
+
+    /** The subscription name used in Pulsar consumer. */
+    protected abstract String subscriptionName();
+
+    /** The subscription type used in Pulsar consumer. */
+    protected abstract SubscriptionType subscriptionType();
+
+    /**
+     * Dynamic generate a partition related topic in Pulsar. This topic should be pre-created in
+     * Pulsar. Everytime we call this method, we may get a new partition name.
+     */
+    protected abstract String generatePartitionName();
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
similarity index 58%
copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
copy to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
index 6bad7220886..35496ccab21 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
@@ -16,26 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.source;
+package org.apache.flink.connector.pulsar.testutils.source;
 
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.SharedSubscriptionConsumingContext;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.testutils.junit.FailsOnJava11;
 import org.apache.flink.util.CloseableIterator;
 
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.experimental.categories.Category;
 import org.junit.jupiter.api.Disabled;
 
 import java.util.List;
@@ -46,28 +35,15 @@ import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /**
- * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
+ * Pulsar with {@link SubscriptionType#Key_Shared} and {@link SubscriptionType#Shared} consumes the
+ * message out of order. So we have to override the default connector test tool.
  */
-@Category(value = {FailsOnJava11.class})
-public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
-    // Defines test environment on Flink MiniCluster
-    @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
-
-    // Defines pulsar running environment
-    @TestExternalSystem
-    PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
-
-    @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionConsumingContext> singleTopic =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionConsumingContext::new);
+public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBase<T> {
 
     @Override
     protected void checkResultWithSemantic(
-            CloseableIterator<String> resultIterator,
-            List<List<String>> testData,
+            CloseableIterator<T> resultIterator,
+            List<List<T>> testData,
             CheckpointingMode semantic,
             Integer limit) {
         Runnable runnable =
@@ -83,7 +59,7 @@ public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
      * Shared subscription will have multiple readers on same partition, this would make hard to
      * automatically stop like a bounded source.
      */
-    private static int getExpectedSize(List<List<String>> testData, Integer limit) {
+    private static <T> int getExpectedSize(List<List<T>> testData, Integer limit) {
         if (limit == null) {
             return testData.stream().mapToInt(List::size).sum();
         } else {
@@ -95,7 +71,7 @@ public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
     @Disabled("We don't have any idle readers in Pulsar's shared subscription.")
     public void testIdleReader(
             TestEnvironment testEnv,
-            DataStreamSourceExternalContext<String> externalContext,
+            DataStreamSourceExternalContext<T> externalContext,
             CheckpointingMode semantic)
             throws Exception {
         super.testIdleReader(testEnv, externalContext, semantic);
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
similarity index 52%
copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
copy to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
index 57027f33e1b..ca6410fbc43 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
@@ -16,43 +16,63 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.cases;
+package org.apache.flink.connector.pulsar.testutils.source.cases;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.PulsarSourceTestContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 
 /**
  * Pulsar external context that will create multiple topics with only one partitions as source
  * splits.
  */
-public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
+public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
+
+    private final String topicPrefix = "flink-multiple-topic-" + randomAlphabetic(8) + "-";
+    private final SubscriptionType subscriptionType;
+
+    private int index = 0;
 
     public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
+        this(environment, SubscriptionType.Exclusive);
     }
 
     public MultipleTopicConsumingContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+            PulsarTestEnvironment environment, SubscriptionType subscriptionType) {
+        super(environment);
+        this.subscriptionType = subscriptionType;
     }
 
     @Override
     protected String displayName() {
-        return "consuming message on multiple topic";
+        return "consume message on multiple topic";
+    }
+
+    @Override
+    protected String topicPattern() {
+        return topicPrefix + ".+";
     }
 
     @Override
     protected String subscriptionName() {
-        return "flink-pulsar-multiple-topic-test";
+        return "flink-multiple-topic-test";
     }
 
     @Override
     protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Exclusive;
+        return subscriptionType;
+    }
+
+    @Override
+    protected String generatePartitionName() {
+        String topic = topicPrefix + index;
+        operator.createTopic(topic, 1);
+        index++;
+
+        return topicNameWithPartition(topic, 0);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java
new file mode 100644
index 00000000000..df5166926e0
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SingleTopicConsumingContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.source.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.PulsarSourceTestContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+
+/**
+ * A Pulsar external context that will create only one topic and use partitions in that topic as
+ * source splits.
+ */
+public class SingleTopicConsumingContext extends PulsarSourceTestContext {
+
+    private final String topicName = "pulsar-single-topic-" + randomAlphanumeric(8);
+
+    private int index = 0;
+
+    public SingleTopicConsumingContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consume message on single topic";
+    }
+
+    @Override
+    protected String topicPattern() {
+        return topicName + ".+";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-single-topic-test";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Exclusive;
+    }
+
+    @Override
+    protected String generatePartitionName() {
+        if (index == 0) {
+            operator.createTopic(topicName, index + 1);
+        } else {
+            operator.increaseTopicPartitions(topicName, index + 1);
+        }
+
+        return topicNameWithPartition(topicName, index++);
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
index b9bc5d4c2b1..7f7ab46ddda 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -33,6 +33,7 @@ under the License.
 	<properties>
 		<pulsar.version>2.10.0</pulsar.version>
 		<bouncycastle.version>1.69</bouncycastle.version>
+		<jaxb-api.version>2.3.1</jaxb-api.version>
 	</properties>
 
 	<dependencies>
@@ -41,6 +42,17 @@ under the License.
 			<artifactId>flink-end-to-end-tests-common</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-pulsar</artifactId>
@@ -70,9 +82,6 @@ under the License.
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<skip>true</skip>
-				</configuration>
 			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
@@ -88,6 +97,15 @@ under the License.
 				</executions>
 				<configuration>
 					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-testing.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
 						<artifactItem>
 							<groupId>org.apache.flink</groupId>
 							<artifactId>flink-connector-pulsar</artifactId>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 234c1a01cd6..ea6a982044b 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -25,10 +25,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
+import org.apache.flink.tests.util.pulsar.source.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.FailoverSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
  * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
  * subscription.
  */
+@SuppressWarnings("unused")
 @Category(value = {FailsOnJava11.class})
 public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
 
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 50390486dd6..15333a1d538 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,21 +19,26 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+
+import org.junit.experimental.categories.Category;
 
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
  * subscription.
  */
+@SuppressWarnings("unused")
+@Category(value = {FailsOnJava11.class})
 public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
 
     // Defines the Semantic.
@@ -49,12 +54,10 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S
     PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
-    @SuppressWarnings("unused")
     @TestContext
     PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
             new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
 
-    @SuppressWarnings("unused")
     @TestContext
     PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
             new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
deleted file mode 100644
index 5ad369bcf03..00000000000
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-
-    private int index = 0;
-
-    private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>();
-
-    // Message keys.
-    private final String key1;
-    private final String key2;
-
-    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public KeySharedSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-
-        // Init message keys.
-        this.key1 = randomAlphabetic(8);
-        String newKey2;
-        do {
-            newKey2 = randomAlphabetic(8);
-        } while (keyHash(key1) == keyHash(newKey2));
-        this.key2 = newKey2;
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message by Key_Shared";
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        int keyHash = keyHash(key1);
-        TopicRange range = new TopicRange(keyHash, keyHash);
-
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(
-                                "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(SubscriptionType.Key_Shared)
-                        .setSubscriptionName("pulsar-key-shared")
-                        .setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String topicName = "pulsar-" + index + "-key-shared";
-        operator.createTopic(topicName, 1);
-        index++;
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        KeyedPulsarPartitionDataWriter writer =
-                new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2);
-        writers.add(writer);
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() {
-        for (KeyedPulsarPartitionDataWriter writer : writers) {
-            writer.close();
-        }
-        writers.clear();
-    }
-
-    private int keyHash(String key) {
-        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
-    }
-}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
deleted file mode 100644
index 1a2db6694d1..00000000000
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends PulsarTestContext<String> {
-
-    private int index = 0;
-
-    private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
-
-    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public SharedSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message by Shared";
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(SubscriptionType.Shared)
-                        .setSubscriptionName("pulsar-shared");
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String topicName = "pulsar-" + index + "-shared";
-        operator.createTopic(topicName, 1);
-        index++;
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
-        writers.add(writer);
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() {
-        for (PulsarPartitionDataWriter writer : writers) {
-            writer.close();
-        }
-        writers.clear();
-    }
-}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 9f34554607e..f5e862f17c2 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -43,7 +43,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
                 resourcePath("bcutil-jdk15on.jar"),
                 resourcePath("bcprov-ext-jdk15on.jar"),
                 resourcePath("jaxb-api.jar"),
-                resourcePath("jul-to-slf4j.jar"));
+                resourcePath("jul-to-slf4j.jar"),
+                resourcePath("flink-connector-testing.jar"));
     }
 
     private static String resourcePath(String jarName) {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
index e431e4c89d0..d5f6e11c00a 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -35,23 +35,26 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr
 
     private final PulsarRuntimeOperator operator;
     private final String fullTopicName;
-    private final String key1;
-    private final String key2;
+    private final String keyToRead;
+    private final String keyToExclude;
 
     public KeyedPulsarPartitionDataWriter(
-            PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) {
+            PulsarRuntimeOperator operator,
+            String fullTopicName,
+            String keyToRead,
+            String keyToExclude) {
         this.operator = operator;
         this.fullTopicName = fullTopicName;
-        this.key1 = key1;
-        this.key2 = key2;
+        this.keyToRead = keyToRead;
+        this.keyToExclude = keyToExclude;
     }
 
     @Override
     public void writeRecords(List<String> records) {
-        operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+        List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
 
-        List<String> newRecords = records.stream().map(a -> a + key1).collect(toList());
-        operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+        operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
+        operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
     }
 
     @Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index 01527ea5482..00000000000
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.common;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
-import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-/** A source test template for testing the messages which could be consumed in a unordered way. */
-@ExtendWith({
-    ConnectorTestingExtension.class,
-    TestLoggerExtension.class,
-    TestCaseInvocationContextProvider.class
-})
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class UnorderedSourceTestSuiteBase<T> {
-
-    @TestTemplate
-    @DisplayName("Test source with one split and four consumers")
-    public void testOneSplitWithMultipleConsumers(
-            TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext)
-            throws Exception {
-        TestingSourceSettings sourceSettings =
-                TestingSourceSettings.builder()
-                        .setBoundedness(Boundedness.BOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-                        .build();
-        TestEnvironmentSettings envOptions =
-                TestEnvironmentSettings.builder()
-                        .setConnectorJarPaths(externalContext.getConnectorJarPaths())
-                        .build();
-        List<T> testData =
-                externalContext.generateTestData(
-                        sourceSettings, 0, ThreadLocalRandom.current().nextLong());
-        ExternalSystemSplitDataWriter<T> writer =
-                externalContext.createSourceSplitDataWriter(sourceSettings);
-        writer.writeRecords(testData);
-
-        Source<T, ?, ?> source = externalContext.createSource(sourceSettings);
-        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
-        List<T> results =
-                execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source")
-                        .setParallelism(4)
-                        .executeAndCollect(
-                                "Source single split with four readers.", testData.size());
-
-        assertThat(results, containsInAnyOrder(testData.toArray()));
-    }
-}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
similarity index 71%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
rename to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
index 6fea0c9c9e5..4906ad6cc7e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
@@ -16,32 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
 /** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
-public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
+public class ExclusiveSubscriptionContext extends MultipleTopicConsumingContext {
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public ExclusiveSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+        super(environment);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message by Exclusive";
+        return "consume message by Exclusive";
     }
 
     @Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
similarity index 71%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
rename to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
index c47348861c1..3134db4bdda 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
@@ -16,32 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
 /** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+public class FailoverSubscriptionContext extends MultipleTopicConsumingContext {
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public FailoverSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+        super(environment);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message by Failover";
+        return "consume message by Failover";
     }
 
     @Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
new file mode 100644
index 00000000000..0cae6e58405
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.source;
+
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+
+/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
+public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
+
+    private final String keyToRead;
+    private final String keyToExclude;
+
+    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment, Key_Shared);
+
+        this.keyToRead = randomAlphabetic(8);
+
+        // Make sure they have different hash code.
+        int readHash = keyHash(keyToRead);
+        String randomKey;
+        do {
+            randomKey = randomAlphabetic(8);
+        } while (keyHash(randomKey) == readHash);
+        this.keyToExclude = randomKey;
+    }
+
+    @Override
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
+        String partitionName = generatePartitionName();
+        return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consume message by Key_Shared";
+    }
+
+    @Override
+    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
+        int keyHash = keyHash(keyToRead);
+        TopicRange range = new TopicRange(keyHash, keyHash);
+
+        builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-key-shared-subscription";
+    }
+
+    // This method is copied from Pulsar for calculating message key hash.
+    private int keyHash(String key) {
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
similarity index 55%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
rename to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
index 57027f33e1b..fe9f07832af 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
@@ -16,43 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.pulsar.testutils.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
 
-/**
- * Pulsar external context that will create multiple topics with only one partitions as source
- * splits.
- */
-public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
-
-    public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
+/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
 
-    public MultipleTopicConsumingContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment, Shared);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message on multiple topic";
+        return "consume message by Shared";
     }
 
     @Override
     protected String subscriptionName() {
-        return "flink-pulsar-multiple-topic-test";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Exclusive;
+        return "pulsar-shared-subscription";
     }
 }
diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
index 7be0e8d2d88..0628e632f19 100644
--- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
+++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/source/enumerator/NoOpEnumStateSerializer.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
-/** Mock enumerator state seializer. */
+/** Mock enumerator state serializer. */
 public class NoOpEnumStateSerializer implements SimpleVersionedSerializer<NoOpEnumState> {
     @Override
     public int getVersion() {