You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 03:46:42 UTC

[flink] 04/05: [FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and sink based on the connector testing framework.

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6075746cb3bf4462cdd7d474c96b65e0bdc57d99
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Sep 28 15:28:14 2022 +0800

    [FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and sink based on the connector testing framework.
---
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |   6 +
 flink-connectors/flink-connector-pulsar/pom.xml    |   2 -
 .../source/enumerator/topic/TopicPartition.java    |   5 +
 .../connector/pulsar/sink/PulsarSinkITCase.java    | 155 +++++++++++++--------
 .../pulsar/source/PulsarUnorderedSourceITCase.java |  14 +-
 .../pulsar/testutils/PulsarTestCommonUtils.java    |   5 +-
 .../pulsar/testutils/PulsarTestContext.java        |   6 +-
 .../testutils/runtime/PulsarRuntimeOperator.java   |  15 +-
 .../runtime/container/PulsarContainerRuntime.java  |  25 ++--
 .../testutils/runtime/mock/PulsarMockRuntime.java  |   5 +-
 .../testutils/sink/PulsarPartitionDataReader.java  |  93 +++++++++++++
 .../testutils/sink/PulsarSinkTestContext.java      | 124 +++++++++++++++++
 .../testutils/sink/PulsarSinkTestSuiteBase.java    |  37 +++++
 .../source}/KeyedPulsarPartitionDataWriter.java    |  15 +-
 .../source/UnorderedSourceTestSuiteBase.java       |   5 +-
 .../cases}/KeySharedSubscriptionContext.java       |  24 ++--
 .../cases/MultipleTopicConsumingContext.java       |   9 +-
 .../source/cases}/SharedSubscriptionContext.java   |  12 +-
 .../src/test/resources/docker/bootstrap.sh         |  28 ----
 ...norderedE2ECase.java => PulsarSinkE2ECase.java} |  31 ++---
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |   4 +-
 .../FlinkContainerWithPulsarEnvironment.java       |   5 +-
 22 files changed, 435 insertions(+), 190 deletions(-)

diff --git a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 03d0ae583af..40e7dc9686c 100644
--- a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -1,3 +1,9 @@
+org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
 org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index 49b06b26806..ef0a2ffc12a 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -320,7 +320,6 @@ under the License.
 								<include>**/testutils/**</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
-								<include>containers/txnStandalone.conf</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -344,7 +343,6 @@ under the License.
 								<include>**/testutils/**</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
-								<include>containers/txnStandalone.conf</include>
 							</includes>
 						</configuration>
 					</execution>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
index b3035cde848..1113771b76d 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -58,6 +59,10 @@ public class TopicPartition implements Serializable {
      */
     private final TopicRange range;
 
+    public TopicPartition(String topic, int partitionId) {
+        this(topic, partitionId, createFullRange());
+    }
+
     public TopicPartition(String topic, int partitionId, TopicRange range) {
         this.topic = topicName(checkNotNull(topic));
         this.partitionId = partitionId;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 4679a994f1c..b0ef390b728 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -20,14 +20,28 @@ package org.apache.flink.connector.pulsar.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
+import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.junit.FailsOnJava11;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -41,64 +55,91 @@ import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSer
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 4.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setProducerName("pulsar-sink-it-case")
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+@Category(value = {FailsOnJava11.class})
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 4.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);
+
+            ControlSource source =
+                    new ControlSource(
+                            sharedObjects,
+                            operator(),
+                            topic,
+                            guarantee,
+                            counts,
+                            Duration.ofMillis(50),
+                            Duration.ofMinutes(5));
+            PulsarSink<String> sink =
+                    PulsarSink.builder()
+                            .setServiceUrl(operator().serviceUrl())
+                            .setAdminUrl(operator().adminUrl())
+                            .setDeliveryGuarantee(guarantee)
+                            .setTopics(topic)
+                            .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
+                            .build();
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+            env.setParallelism(PARALLELISM);
+            if (guarantee != DeliveryGuarantee.NONE) {
+                env.enableCheckpointing(500L);
+            }
+            env.addSource(source).sinkTo(sink);
+            env.execute();
+
+            List<String> expectedRecords = source.getExpectedRecords();
+            List<String> consumedRecords = source.getConsumedRecords();
+
+            assertThat(consumedRecords)
+                    .hasSameSizeAs(expectedRecords)
+                    .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        }
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
index d18c6c2633b..070a16126a7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
@@ -22,7 +22,8 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
 import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
@@ -34,8 +35,6 @@ import org.apache.flink.testutils.junit.FailsOnJava11;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.junit.experimental.categories.Category;
 
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
 /**
  * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
  */
@@ -53,7 +52,10 @@ public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<St
     CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
 
     @TestContext
-    PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
-            new PulsarTestContextFactory<>(
-                    pulsar, env -> new MultipleTopicConsumingContext(env, Shared));
+    PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
+            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
+
+    @TestContext
+    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
+            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
index 96b1ca62f2a..994601329d6 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
@@ -59,9 +58,7 @@ public class PulsarTestCommonUtils {
 
     public static PulsarPartitionSplit createPartitionSplit(
             String topic, int partitionId, Boundedness boundedness, MessageId latestConsumedId) {
-        TopicPartition topicPartition =
-                new TopicPartition(topic, partitionId, TopicRange.createFullRange());
-
+        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
         StopCursor stopCursor =
                 boundedness == Boundedness.BOUNDED ? StopCursor.latest() : StopCursor.never();
         return new PulsarPartitionSplit(topicPartition, stopCursor, latestConsumedId, null);
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index a4e44441bfd..1408656f986 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -52,10 +52,12 @@ public abstract class PulsarTestContext<T> implements ExternalContext {
 
     @Override
     public List<URL> getConnectorJarPaths() {
-        // We don't need any tests jar definition. They are provided in docker-related environments.
+        // We don't need any test jars definition. They are provided in docker-related environments.
         return Collections.emptyList();
     }
 
     @Override
-    public void close() throws Exception {}
+    public void close() throws Exception {
+        // All the topics would be deleted in the PulsarRuntime. No need to manually close them.
+    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 9787a8ceeb9..5d14d7ee081 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.testframe.external.ExternalContext;
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
@@ -48,7 +47,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -228,12 +226,7 @@ public class PulsarRuntimeOperator implements Closeable {
     public List<TopicPartition> topicInfo(String topic) {
         try {
             return client().getPartitionsForTopic(topic).get().stream()
-                    .map(
-                            p ->
-                                    new TopicPartition(
-                                            topic,
-                                            TopicName.getPartitionIndex(p),
-                                            TopicRange.createFullRange()))
+                    .map(p -> new TopicPartition(topic, TopicName.getPartitionIndex(p)))
                     .collect(toList());
         } catch (InterruptedException | ExecutionException e) {
             throw new IllegalStateException(e);
@@ -452,7 +445,7 @@ public class PulsarRuntimeOperator implements Closeable {
 
     /** This method is used for test framework. You can't close this operator manually. */
     @Override
-    public void close() throws IOException {
+    public void close() throws PulsarClientException {
         if (admin != null) {
             admin.close();
         }
@@ -485,7 +478,7 @@ public class PulsarRuntimeOperator implements Closeable {
         }
     }
 
-    private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
+    private <T> Producer<T> createProducer(String topic, Schema<T> schema) {
         ProducerBuilder<T> builder =
                 client().newProducer(schema)
                         .topic(topic)
@@ -496,7 +489,7 @@ public class PulsarRuntimeOperator implements Closeable {
         return sneakyClient(builder::create);
     }
 
-    private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
+    private <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
         // Create the earliest subscription if it's not existed.
         List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
         if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 910985338b9..eac42f063bc 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.DockerImageVersions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -88,22 +87,30 @@ public class PulsarContainerRuntime implements PulsarRuntime {
             return;
         }
 
-        // Override the default configuration in container for enabling the Pulsar transaction.
-        container.withClasspathResourceMapping(
-                "docker/bootstrap.sh", "/pulsar/bin/bootstrap.sh", BindMode.READ_ONLY);
-        // Waiting for the Pulsar border is ready.
+        // Override the default standalone configuration by system environments.
+        container.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
+        container.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
+        container.withEnv("PULSAR_PREFIX_systemTopicEnabled", "true");
+        container.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
+        container.withEnv("PULSAR_PREFIX_defaultNumberOfNamespaceBundles", "1");
+        // Change the default bootstrap script, it will override the default configuration
+        // and start a standalone Pulsar without streaming storage and function worker.
+        container.withCommand(
+                "sh",
+                "-c",
+                "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone --no-functions-worker -nss");
+        // Waiting for the Pulsar broker and the transaction is ready after the container started.
         container.waitingFor(
-                forHttp("/admin/v2/namespaces/public/default")
+                forHttp(
+                                "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions")
                         .forPort(BROKER_HTTP_PORT)
                         .forStatusCode(200)
                         .withStartupTimeout(Duration.ofMinutes(5)));
-        // Set custom startup script.
-        container.withCommand("sh /pulsar/bin/bootstrap.sh");
 
         // Start the Pulsar Container.
         container.start();
         // Append the output to this runtime logger. Used for local debug purpose.
-        container.followOutput(new Slf4jLogConsumer(LOG).withSeparateOutputStreams());
+        container.followOutput(new Slf4jLogConsumer(LOG, true));
 
         // Create the operator.
         if (boundFlink) {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index 326c3ff6a89..b0df4c5d1ff 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -64,10 +64,10 @@ public class PulsarMockRuntime implements PulsarRuntime {
     @Override
     public void tearDown() {
         try {
-            pulsarService.close();
             if (operator != null) {
                 operator.close();
             }
+            pulsarService.close();
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
@@ -95,7 +95,7 @@ public class PulsarMockRuntime implements PulsarRuntime {
         configuration.setAuthorizationEnabled(false);
         configuration.setAllowAutoTopicCreation(true);
         configuration.setBrokerDeleteInactiveTopicsEnabled(false);
-
+        configuration.setTopicLevelPoliciesEnabled(true);
         configuration.setWebSocketServiceEnabled(false);
         // Use runtime dynamic ports
         configuration.setBrokerServicePort(Optional.of(0));
@@ -107,6 +107,7 @@ public class PulsarMockRuntime implements PulsarRuntime {
         configuration.setTransactionCoordinatorEnabled(true);
         configuration.setTransactionMetadataStoreProviderClassName(
                 "org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider");
+        configuration.setDefaultNumberOfNamespaceBundles(1);
 
         return configuration;
     }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
new file mode 100644
index 00000000000..6ef9510d116
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+
+/** The data reader for a specified topic partition from Pulsar. */
+public class PulsarPartitionDataReader<T> implements ExternalSystemDataReader<T>, Closeable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionDataReader.class);
+
+    private final Consumer<T> consumer;
+
+    public PulsarPartitionDataReader(
+            PulsarRuntimeOperator operator, String fullTopicName, Schema<T> schema)
+            throws PulsarClientException {
+        // Create client for supporting the use in E2E test.
+        String subscriptionName = randomAlphanumeric(12);
+        this.consumer =
+                operator.client()
+                        .newConsumer(schema)
+                        .topic(fullTopicName)
+                        .subscriptionName(subscriptionName)
+                        .subscriptionType(SubscriptionType.Exclusive)
+                        .subscriptionMode(SubscriptionMode.Durable)
+                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                        .subscribe();
+    }
+
+    @Override
+    public List<T> poll(Duration timeout) {
+        List<T> results = new ArrayList<>();
+
+        while (true) {
+            try {
+                int millis = Math.toIntExact(timeout.toMillis());
+                Message<T> message = consumer.receive(millis, MILLISECONDS);
+
+                if (message != null) {
+                    consumer.acknowledgeCumulative(message);
+                    results.add(message.getValue());
+                } else {
+                    break;
+                }
+            } catch (Exception e) {
+                LOG.error("", e);
+                break;
+            }
+        }
+
+        return results;
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        consumer.close();
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
new file mode 100644
index 00000000000..6b9cf2d1041
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
+
+/** Common sink test context for pulsar based test. */
+public class PulsarSinkTestContext extends PulsarTestContext<String>
+        implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
+    private static final int RECORD_SIZE_UPPER_BOUND = 300;
+    private static final int RECORD_SIZE_LOWER_BOUND = 100;
+    private static final int RECORD_STRING_SIZE = 20;
+
+    private String topicName = topicName();
+    private final Closer closer = Closer.create();
+
+    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
+        super(environment, Schema.STRING);
+    }
+
+    @Override
+    protected String displayName() {
+        return "write messages into one topic in Pulsar";
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        operator.createTopic(topicName, 4);
+        DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
+
+        return PulsarSink.builder()
+                .setServiceUrl(operator.serviceUrl())
+                .setAdminUrl(operator.adminUrl())
+                .setTopics(topicName)
+                .setDeliveryGuarantee(guarantee)
+                .setSerializationSchema(pulsarSchema(schema))
+                .enableSchemaEvolution()
+                .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        PulsarPartitionDataReader<String> reader =
+                sneakyClient(
+                        () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+        closer.register(reader);
+
+        return reader;
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordSize =
+                random.nextInt(RECORD_SIZE_UPPER_BOUND - RECORD_SIZE_LOWER_BOUND)
+                        + RECORD_SIZE_LOWER_BOUND;
+        List<String> records = new ArrayList<>(recordSize);
+        for (int i = 0; i < recordSize; i++) {
+            int size = random.nextInt(RECORD_STRING_SIZE) + RECORD_STRING_SIZE;
+            String record = "index:" + i + "-data:" + randomAlphanumeric(size);
+            records.add(record);
+        }
+
+        return records;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return Types.STRING;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Change the topic name after finishing a test case.
+        closer.register(() -> topicName = topicName());
+        closer.close();
+    }
+
+    private String topicName() {
+        return TOPIC_NAME_PREFIX + randomAlphanumeric(8);
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
new file mode 100644
index 00000000000..0695a43569b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.junit.jupiter.api.Disabled;
+
+/** Pulsar sink don't expose the monitor metrics now. We have to disable this test. */
+public abstract class PulsarSinkTestSuiteBase extends SinkTestSuiteBase<String> {
+
+    @Override
+    @Disabled("Enable this test after FLINK-26027 being merged.")
+    public void testMetrics(
+            TestEnvironment testEnv,
+            DataStreamSinkExternalContext<String> externalContext,
+            CheckpointingMode semantic) {}
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
similarity index 77%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
index d5f6e11c00a..fa1247b73d8 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.common;
+package org.apache.flink.connector.pulsar.testutils.source;
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
@@ -25,8 +25,6 @@ import org.apache.pulsar.client.api.Schema;
 
 import java.util.List;
 
-import static java.util.stream.Collectors.toList;
-
 /**
  * Source split data writer for writing test data into a Pulsar topic partition. It will write the
  * message with two keys.
@@ -36,24 +34,17 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr
     private final PulsarRuntimeOperator operator;
     private final String fullTopicName;
     private final String keyToRead;
-    private final String keyToExclude;
 
     public KeyedPulsarPartitionDataWriter(
-            PulsarRuntimeOperator operator,
-            String fullTopicName,
-            String keyToRead,
-            String keyToExclude) {
+            PulsarRuntimeOperator operator, String fullTopicName, String keyToRead) {
         this.operator = operator;
         this.fullTopicName = fullTopicName;
         this.keyToRead = keyToRead;
-        this.keyToExclude = keyToExclude;
     }
 
     @Override
     public void writeRecords(List<String> records) {
-        List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
-
-        operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
+        // Send messages with the given key.
         operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
     }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
index 35496ccab21..af41d8c3093 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
@@ -72,8 +72,5 @@ public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBas
     public void testIdleReader(
             TestEnvironment testEnv,
             DataStreamSourceExternalContext<T> externalContext,
-            CheckpointingMode semantic)
-            throws Exception {
-        super.testIdleReader(testEnv, externalContext, semantic);
-    }
+            CheckpointingMode semantic) {}
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
similarity index 81%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
index 0cae6e58405..1901787b342 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.source;
+package org.apache.flink.connector.pulsar.testutils.source.cases;
 
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDataWriter;
 import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
 import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
@@ -33,33 +32,23 @@ import org.apache.pulsar.common.util.Murmur3_32Hash;
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
 
 /** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
 public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
 
     private final String keyToRead;
-    private final String keyToExclude;
 
     public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment, Key_Shared);
+        super(environment);
 
         this.keyToRead = randomAlphabetic(8);
-
-        // Make sure they have different hash code.
-        int readHash = keyHash(keyToRead);
-        String randomKey;
-        do {
-            randomKey = randomAlphabetic(8);
-        } while (keyHash(randomKey) == readHash);
-        this.keyToExclude = randomKey;
     }
 
     @Override
     public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
             TestingSourceSettings sourceSettings) {
         String partitionName = generatePartitionName();
-        return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
+        return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead);
     }
 
     @Override
@@ -80,6 +69,11 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
         return "pulsar-key-shared-subscription";
     }
 
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Key_Shared;
+    }
+
     // This method is copied from Pulsar for calculating message key hash.
     private int keyHash(String key) {
         return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
index ca6410fbc43..422c954a4e7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
@@ -33,18 +33,11 @@ import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNam
 public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
 
     private final String topicPrefix = "flink-multiple-topic-" + randomAlphabetic(8) + "-";
-    private final SubscriptionType subscriptionType;
 
     private int index = 0;
 
     public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        this(environment, SubscriptionType.Exclusive);
-    }
-
-    public MultipleTopicConsumingContext(
-            PulsarTestEnvironment environment, SubscriptionType subscriptionType) {
         super(environment);
-        this.subscriptionType = subscriptionType;
     }
 
     @Override
@@ -64,7 +57,7 @@ public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
 
     @Override
     protected SubscriptionType subscriptionType() {
-        return subscriptionType;
+        return SubscriptionType.Exclusive;
     }
 
     @Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
similarity index 84%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
index fe9f07832af..cd649fef040 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
@@ -16,20 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.source;
+package org.apache.flink.connector.pulsar.testutils.source.cases;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
 /** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
 public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
 
     public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment, Shared);
+        super(environment);
     }
 
     @Override
@@ -41,4 +38,9 @@ public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
     protected String subscriptionName() {
         return "pulsar-shared-subscription";
     }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Shared;
+    }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh b/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh
deleted file mode 100755
index fa4e2921a75..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env bash
-# ----------------------------------------------------------------------------
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# ----------------------------------------------------------------------------
-
-# Enable the transaction in standalone config.
-sed -i 's/transactionCoordinatorEnabled=false/transactionCoordinatorEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/acknowledgmentAtBatchIndexLevelEnabled=false/acknowledgmentAtBatchIndexLevelEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/systemTopicEnabled=false/systemTopicEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/brokerDeduplicationEnabled=false/brokerDeduplicationEnabled=true/g' /pulsar/conf/standalone.conf
-
-# Start Pulsar standalone without function worker and streaming storage.
-/pulsar/bin/pulsar standalone --no-functions-worker -nss
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
similarity index 64%
copy from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index 15333a1d538..a19f5931b58 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -19,7 +19,8 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,38 +28,30 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
 
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
- * subscription.
- */
+/** Pulsar sink E2E test based on connector testing framework. */
 @SuppressWarnings("unused")
 @Category(value = {FailsOnJava11.class})
-public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
+public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
 
-    // Defines the Semantic.
     @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+    CheckpointingMode[] semantics =
+            new CheckpointingMode[] {
+                CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+            };
 
-    // Defines TestEnvironment.
+    // Defines TestEnvironment
     @TestEnv
-    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
+    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6);
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
     PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
-    // Defines a set of external context Factories for different test cases.
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
     @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
-            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
+    PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+            new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 15333a1d538..89692d1c449 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -20,6 +20,8 @@ package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,8 +29,6 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 14d8cbdd12c..b5ddbb8b976 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -51,11 +51,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
         return TestUtils.getResource(jarName).toAbsolutePath().toString();
     }
 
-    protected static Configuration flinkConfiguration() {
+    private static Configuration flinkConfiguration() {
         Configuration configuration = new Configuration();
-        // Increase the off heap memory of TaskManager to avoid direct buffer memory error in Pulsar
-        // e2e tests.
-        configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100));
 
         // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048));