You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/12 18:33:38 UTC

[flink-connector-pulsar] 21/27: [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 3b2377d09c016bd7bf1968addd02ee884d274d9d
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Sep 27 18:04:24 2022 +0800

    [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.
---
 flink-connector-pulsar-e2e-tests/pom.xml           |  24 +++-
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |   5 +-
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  13 +-
 .../pulsar/cases/KeySharedSubscriptionContext.java | 144 ---------------------
 .../pulsar/cases/SharedSubscriptionContext.java    | 116 -----------------
 .../FlinkContainerWithPulsarEnvironment.java       |   3 +-
 .../common/KeyedPulsarPartitionDataWriter.java     |  19 +--
 .../common/UnorderedSourceTestSuiteBase.java       |  86 ------------
 .../ExclusiveSubscriptionContext.java              |  19 +--
 .../FailoverSubscriptionContext.java               |  19 +--
 .../source/KeySharedSubscriptionContext.java       |  87 +++++++++++++
 .../SharedSubscriptionContext.java}                |  30 ++---
 12 files changed, 151 insertions(+), 414 deletions(-)

diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml
index b9bc5d4..7f7ab46 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -33,6 +33,7 @@ under the License.
 	<properties>
 		<pulsar.version>2.10.0</pulsar.version>
 		<bouncycastle.version>1.69</bouncycastle.version>
+		<jaxb-api.version>2.3.1</jaxb-api.version>
 	</properties>
 
 	<dependencies>
@@ -41,6 +42,17 @@ under the License.
 			<artifactId>flink-end-to-end-tests-common</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-test-utils</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.google.guava</groupId>
+					<artifactId>guava</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-pulsar</artifactId>
@@ -70,9 +82,6 @@ under the License.
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
-				<configuration>
-					<skip>true</skip>
-				</configuration>
 			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
@@ -88,6 +97,15 @@ under the License.
 				</executions>
 				<configuration>
 					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-test-utils</artifactId>
+							<version>${project.version}</version>
+							<destFileName>flink-connector-testing.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies
+							</outputDirectory>
+						</artifactItem>
 						<artifactItem>
 							<groupId>org.apache.flink</groupId>
 							<artifactId>flink-connector-pulsar</artifactId>
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 234c1a0..ea6a982 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -25,10 +25,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
+import org.apache.flink.tests.util.pulsar.source.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.FailoverSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
  * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
  * subscription.
  */
+@SuppressWarnings("unused")
 @Category(value = {FailsOnJava11.class})
 public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
 
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 5039048..15333a1 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,21 +19,26 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+
+import org.junit.experimental.categories.Category;
 
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
  * subscription.
  */
+@SuppressWarnings("unused")
+@Category(value = {FailsOnJava11.class})
 public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
 
     // Defines the Semantic.
@@ -49,12 +54,10 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S
     PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
-    @SuppressWarnings("unused")
     @TestContext
     PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
             new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
 
-    @SuppressWarnings("unused")
     @TestContext
     PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
             new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
deleted file mode 100644
index 5ad369b..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-
-    private int index = 0;
-
-    private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>();
-
-    // Message keys.
-    private final String key1;
-    private final String key2;
-
-    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public KeySharedSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-
-        // Init message keys.
-        this.key1 = randomAlphabetic(8);
-        String newKey2;
-        do {
-            newKey2 = randomAlphabetic(8);
-        } while (keyHash(key1) == keyHash(newKey2));
-        this.key2 = newKey2;
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message by Key_Shared";
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        int keyHash = keyHash(key1);
-        TopicRange range = new TopicRange(keyHash, keyHash);
-
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(
-                                "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(SubscriptionType.Key_Shared)
-                        .setSubscriptionName("pulsar-key-shared")
-                        .setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String topicName = "pulsar-" + index + "-key-shared";
-        operator.createTopic(topicName, 1);
-        index++;
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        KeyedPulsarPartitionDataWriter writer =
-                new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2);
-        writers.add(writer);
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() {
-        for (KeyedPulsarPartitionDataWriter writer : writers) {
-            writer.close();
-        }
-        writers.clear();
-    }
-
-    private int keyHash(String key) {
-        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
-    }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
deleted file mode 100644
index 1a2db66..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends PulsarTestContext<String> {
-
-    private int index = 0;
-
-    private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
-
-    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public SharedSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consuming message by Shared";
-    }
-
-    @Override
-    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(SubscriptionType.Shared)
-                        .setSubscriptionName("pulsar-shared");
-        if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String topicName = "pulsar-" + index + "-shared";
-        operator.createTopic(topicName, 1);
-        index++;
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
-        writers.add(writer);
-
-        return writer;
-    }
-
-    @Override
-    public List<String> generateTestData(
-            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public TypeInformation<String> getProducedType() {
-        return TypeInformation.of(String.class);
-    }
-
-    @Override
-    public void close() {
-        for (PulsarPartitionDataWriter writer : writers) {
-            writer.close();
-        }
-        writers.clear();
-    }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 9f34554..f5e862f 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -43,7 +43,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
                 resourcePath("bcutil-jdk15on.jar"),
                 resourcePath("bcprov-ext-jdk15on.jar"),
                 resourcePath("jaxb-api.jar"),
-                resourcePath("jul-to-slf4j.jar"));
+                resourcePath("jul-to-slf4j.jar"),
+                resourcePath("flink-connector-testing.jar"));
     }
 
     private static String resourcePath(String jarName) {
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
index e431e4c..d5f6e11 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -35,23 +35,26 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr
 
     private final PulsarRuntimeOperator operator;
     private final String fullTopicName;
-    private final String key1;
-    private final String key2;
+    private final String keyToRead;
+    private final String keyToExclude;
 
     public KeyedPulsarPartitionDataWriter(
-            PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) {
+            PulsarRuntimeOperator operator,
+            String fullTopicName,
+            String keyToRead,
+            String keyToExclude) {
         this.operator = operator;
         this.fullTopicName = fullTopicName;
-        this.key1 = key1;
-        this.key2 = key2;
+        this.keyToRead = keyToRead;
+        this.keyToExclude = keyToExclude;
     }
 
     @Override
     public void writeRecords(List<String> records) {
-        operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+        List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
 
-        List<String> newRecords = records.stream().map(a -> a + key1).collect(toList());
-        operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+        operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
+        operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
     }
 
     @Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index 01527ea..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.common;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
-import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-/** A source test template for testing the messages which could be consumed in a unordered way. */
-@ExtendWith({
-    ConnectorTestingExtension.class,
-    TestLoggerExtension.class,
-    TestCaseInvocationContextProvider.class
-})
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class UnorderedSourceTestSuiteBase<T> {
-
-    @TestTemplate
-    @DisplayName("Test source with one split and four consumers")
-    public void testOneSplitWithMultipleConsumers(
-            TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext)
-            throws Exception {
-        TestingSourceSettings sourceSettings =
-                TestingSourceSettings.builder()
-                        .setBoundedness(Boundedness.BOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-                        .build();
-        TestEnvironmentSettings envOptions =
-                TestEnvironmentSettings.builder()
-                        .setConnectorJarPaths(externalContext.getConnectorJarPaths())
-                        .build();
-        List<T> testData =
-                externalContext.generateTestData(
-                        sourceSettings, 0, ThreadLocalRandom.current().nextLong());
-        ExternalSystemSplitDataWriter<T> writer =
-                externalContext.createSourceSplitDataWriter(sourceSettings);
-        writer.writeRecords(testData);
-
-        Source<T, ?, ?> source = externalContext.createSource(sourceSettings);
-        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
-        List<T> results =
-                execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source")
-                        .setParallelism(4)
-                        .executeAndCollect(
-                                "Source single split with four readers.", testData.size());
-
-        assertThat(results, containsInAnyOrder(testData.toArray()));
-    }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
similarity index 71%
rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
index 6fea0c9..4906ad6 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
@@ -16,32 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
 /** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
-public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
+public class ExclusiveSubscriptionContext extends MultipleTopicConsumingContext {
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public ExclusiveSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+        super(environment);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message by Exclusive";
+        return "consume message by Exclusive";
     }
 
     @Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
similarity index 71%
copy from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
copy to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
index c473488..3134db4 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
@@ -16,32 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
 /** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+public class FailoverSubscriptionContext extends MultipleTopicConsumingContext {
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public FailoverSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+        super(environment);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message by Failover";
+        return "consume message by Failover";
     }
 
     @Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
new file mode 100644
index 0000000..0cae6e5
--- /dev/null
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.source;
+
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+
+/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
+public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
+
+    private final String keyToRead;
+    private final String keyToExclude;
+
+    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment, Key_Shared);
+
+        this.keyToRead = randomAlphabetic(8);
+
+        // Make sure they have different hash code.
+        int readHash = keyHash(keyToRead);
+        String randomKey;
+        do {
+            randomKey = randomAlphabetic(8);
+        } while (keyHash(randomKey) == readHash);
+        this.keyToExclude = randomKey;
+    }
+
+    @Override
+    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
+        String partitionName = generatePartitionName();
+        return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consume message by Key_Shared";
+    }
+
+    @Override
+    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
+        int keyHash = keyHash(keyToRead);
+        TopicRange range = new TopicRange(keyHash, keyHash);
+
+        builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-key-shared-subscription";
+    }
+
+    // This method is copied from Pulsar for calculating message key hash.
+    private int keyHash(String key) {
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+    }
+}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
similarity index 57%
rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
index c473488..fe9f078 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
@@ -16,41 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
 
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
 
-/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
 
-    public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
-        this(environment, Collections.emptyList());
-    }
-
-    public FailoverSubscriptionContext(
-            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
-        super(environment, connectorJarPaths);
+    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment, Shared);
     }
 
     @Override
     protected String displayName() {
-        return "consuming message by Failover";
+        return "consume message by Shared";
     }
 
     @Override
     protected String subscriptionName() {
-        return "pulsar-failover-subscription";
-    }
-
-    @Override
-    protected SubscriptionType subscriptionType() {
-        return SubscriptionType.Failover;
+        return "pulsar-shared-subscription";
     }
 }