You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/31 02:42:33 UTC

[flink-connector-pulsar] branch v3.0 updated: [FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/v3.0 by this push:
     new b708afd  [FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)
b708afd is described below

commit b708afd5f1b0b5fc41f2177b27a6aab0ac9136b4
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Jan 31 10:42:27 2023 +0800

    [FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)
---
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  63 -----------
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |   6 -
 .../pulsar/source/PulsarUnorderedSourceITCase.java |  60 ----------
 .../enumerator/PulsarSourceEnumeratorTest.java     |  14 +--
 .../assigner/SharedSplitAssignerTest.java          | 121 ---------------------
 .../source/PulsarUnorderedSourceReaderTest.java    |  27 -----
 .../PulsarUnorderedPartitionSplitReaderTest.java   |  28 -----
 .../source/KeyedPulsarPartitionDataWriter.java     |  66 -----------
 .../source/UnorderedSourceTestSuiteBase.java       |  76 -------------
 .../source/cases/KeySharedSubscriptionContext.java |  85 ---------------
 .../source/cases/SharedSubscriptionContext.java    |  46 --------
 11 files changed, 7 insertions(+), 585 deletions(-)

diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
deleted file mode 100644
index 770d118..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-
-import org.junit.jupiter.api.Tag;
-
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
- * subscription.
- */
-@SuppressWarnings("unused")
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
-
-    // Defines the Semantic.
-    @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
-    // Defines TestEnvironment.
-    @TestEnv
-    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
-
-    // Defines ConnectorExternalSystem.
-    @TestExternalSystem
-    PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
-
-    // Defines a set of external context Factories for different test cases.
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
-    @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
-            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 40e7dc9..37f6ba8 100644
--- a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -10,9 +10,3 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
deleted file mode 100644
index 8586c7d..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Tag;
-
-/**
- * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
- */
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<String> {
-
-    // Defines test environment on Flink MiniCluster
-    @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
-
-    // Defines pulsar running environment
-    @TestExternalSystem
-    PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
-
-    @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
-            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
-    @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
-            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 702a567..0e30916 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -71,7 +71,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) throws Exception {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -96,7 +96,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void startWithPeriodicPartitionDiscovery(SubscriptionType subscriptionType) throws Exception {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -119,7 +119,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void discoverPartitionsTriggersAssignments(SubscriptionType subscriptionType) throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -148,7 +148,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void discoverPartitionsPeriodically(SubscriptionType subscriptionType) throws Throwable {
         String dynamicTopic = "topic3-" + randomAlphabetic(10);
         Set<String> preexistingTopics = setupPreexistingTopics();
@@ -192,7 +192,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void addSplitsBack(SubscriptionType subscriptionType) throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -225,7 +225,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void workWithPreexistingAssignments(SubscriptionType subscriptionType) throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         PulsarSourceEnumState preexistingAssignments;
@@ -267,7 +267,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
     @ParameterizedTest
     @EnumSource(
             value = SubscriptionType.class,
-            names = {"Failover", "Shared"})
+            names = {"Failover"})
     void snapshotState(SubscriptionType subscriptionType) throws Throwable {
         Set<String> preexistingTopics = setupPreexistingTopics();
         try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
deleted file mode 100644
index 68f73b5..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** Unit tests for {@link SharedSplitAssigner}. */
-class SharedSplitAssignerTest extends SplitAssignerTestBase {
-
-    @Test
-    void noMoreSplits() {
-        SplitAssigner assigner = splitAssigner(true, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner = splitAssigner(false, 4);
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.registerTopicPartitions(createPartitions("f", 8));
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.createAssignment(singletonList(1));
-        assertTrue(assigner.noMoreSplits(1));
-        assertFalse(assigner.noMoreSplits(3));
-
-        assigner.createAssignment(singletonList(3));
-        assertTrue(assigner.noMoreSplits(3));
-    }
-
-    @Test
-    void partitionsAssignment() {
-        SplitAssigner assigner = splitAssigner(true, 8);
-        assigner.registerTopicPartitions(createPartitions("d", 4));
-        List<Integer> readers = Arrays.asList(1, 3, 5, 7);
-
-        // Assignment with initial states.
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
-                assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(4);
-
-        // Reassignment with same readers.
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isNotPresent();
-
-        // Register new partition and assign.
-        assigner.registerTopicPartitions(createPartitions("e", 5));
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(4);
-
-        // Assign to new readers.
-        readers = Arrays.asList(0, 2, 4, 6);
-        assignment = assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment())
-                .hasSize(4)
-                .allSatisfy((k, v) -> assertThat(v).hasSize(2));
-    }
-
-    @Test
-    void reassignSplitsAfterRestarting() {
-        SplitAssigner assigner = splitAssigner(true, 8);
-        Set<TopicPartition> partitions = createPartitions("d", 4);
-        assigner.registerTopicPartitions(partitions);
-        List<Integer> readers = Arrays.asList(0, 1, 2);
-
-        Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
-                assigner.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(3);
-
-        // Create a new split assigner with same state.
-        SplitAssigner assigner1 = splitAssigner(true, 8, partitions);
-        assigner1.registerTopicPartitions(partitions);
-        assignment = assigner1.createAssignment(readers);
-        assertThat(assignment).isPresent();
-        assertThat(assignment.get().assignment()).hasSize(3);
-    }
-
-    @Override
-    protected SplitAssigner createAssigner(
-            StopCursor stopCursor,
-            boolean enablePartitionDiscovery,
-            SplitEnumeratorContext<PulsarPartitionSplit> context,
-            PulsarSourceEnumState enumState) {
-        return new SharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState);
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
deleted file mode 100644
index 4f7fdd3..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-class PulsarUnorderedSourceReaderTest extends PulsarSourceReaderTestBase {
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
deleted file mode 100644
index 2cb3cb9..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** Unit tests for {@link PulsarUnorderedPartitionSplitReaderTest}. */
-class PulsarUnorderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase {
-    @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
deleted file mode 100644
index 23d65f0..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source;
-
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-
-import org.apache.pulsar.client.api.Schema;
-
-import java.util.List;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Source split data writer for writing test data into a Pulsar topic partition. It will write the
- * message with two keys.
- */
-public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWriter<String> {
-
-    private final PulsarRuntimeOperator operator;
-    private final String fullTopicName;
-    private final String keyToRead;
-    private final String keyToExclude;
-
-    public KeyedPulsarPartitionDataWriter(
-            PulsarRuntimeOperator operator,
-            String fullTopicName,
-            String keyToRead,
-            String keyToExclude) {
-        this.operator = operator;
-        this.fullTopicName = fullTopicName;
-        this.keyToRead = keyToRead;
-        this.keyToExclude = keyToExclude;
-    }
-
-    @Override
-    public void writeRecords(List<String> records) {
-        // Send messages with the key we don't need.
-        List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
-        operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
-
-        // Send messages with the given key.
-        operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
-    }
-
-    @Override
-    public void close() {
-        // Nothing to do.
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index af41d8c..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source;
-
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Disabled;
-
-import java.util.List;
-
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
-import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-
-/**
- * Pulsar with {@link SubscriptionType#Key_Shared} and {@link SubscriptionType#Shared} consumes the
- * message out of order. So we have to override the default connector test tool.
- */
-public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBase<T> {
-
-    @Override
-    protected void checkResultWithSemantic(
-            CloseableIterator<T> resultIterator,
-            List<List<T>> testData,
-            CheckpointingMode semantic,
-            Integer limit) {
-        Runnable runnable =
-                () ->
-                        assertUnordered(resultIterator)
-                                .withNumRecordsLimit(getExpectedSize(testData, limit))
-                                .matchesRecordsFromSource(testData, semantic);
-
-        assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
-    }
-
-    /**
-     * Shared subscription will have multiple readers on same partition, this would make hard to
-     * automatically stop like a bounded source.
-     */
-    private static <T> int getExpectedSize(List<List<T>> testData, Integer limit) {
-        if (limit == null) {
-            return testData.stream().mapToInt(List::size).sum();
-        } else {
-            return limit;
-        }
-    }
-
-    @Override
-    @Disabled("We don't have any idle readers in Pulsar's shared subscription.")
-    public void testIdleReader(
-            TestEnvironment testEnv,
-            DataStreamSourceExternalContext<T> externalContext,
-            CheckpointingMode semantic) {}
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
deleted file mode 100644
index c348853..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source.cases;
-
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedKeysRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDataWriter;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
-
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
-
-    private final String keyToRead;
-    private final String keyToExclude;
-
-    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
-
-        this.keyToRead = randomAlphabetic(8);
-
-        // Make sure they have different hash code.
-        int readHash = keyHash(keyToRead);
-        String randomKey;
-        do {
-            randomKey = randomAlphabetic(8);
-        } while (keyHash(randomKey) == readHash);
-        this.keyToExclude = randomKey;
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String partitionName = generatePartitionName();
-        return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Key_Shared";
-    }
-
-    @Override
-    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
-        // Make sure we only consume the messages with keyToRead.
-        FixedKeysRangeGenerator generator =
-                FixedKeysRangeGenerator.builder().key(keyToRead).keySharedMode(JOIN).build();
-        builder.setRangeGenerator(generator);
-        builder.setConfig(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY, true);
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-key-shared-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Key_Shared;
-    }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
deleted file mode 100644
index cd649fe..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source.cases;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
-
-    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Shared";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-shared-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Shared;
-    }
-}