You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/12/12 18:33:38 UTC
[flink-connector-pulsar] 21/27: [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.
This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit 3b2377d09c016bd7bf1968addd02ee884d274d9d
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Sep 27 18:04:24 2022 +0800
[FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools.
---
flink-connector-pulsar-e2e-tests/pom.xml | 24 +++-
.../util/pulsar/PulsarSourceOrderedE2ECase.java | 5 +-
.../util/pulsar/PulsarSourceUnorderedE2ECase.java | 13 +-
.../pulsar/cases/KeySharedSubscriptionContext.java | 144 ---------------------
.../pulsar/cases/SharedSubscriptionContext.java | 116 -----------------
.../FlinkContainerWithPulsarEnvironment.java | 3 +-
.../common/KeyedPulsarPartitionDataWriter.java | 19 +--
.../common/UnorderedSourceTestSuiteBase.java | 86 ------------
.../ExclusiveSubscriptionContext.java | 19 +--
.../FailoverSubscriptionContext.java | 19 +--
.../source/KeySharedSubscriptionContext.java | 87 +++++++++++++
.../SharedSubscriptionContext.java} | 30 ++---
12 files changed, 151 insertions(+), 414 deletions(-)
diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml
index b9bc5d4..7f7ab46 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<pulsar.version>2.10.0</pulsar.version>
<bouncycastle.version>1.69</bouncycastle.version>
+ <jaxb-api.version>2.3.1</jaxb-api.version>
</properties>
<dependencies>
@@ -41,6 +42,17 @@ under the License.
<artifactId>flink-end-to-end-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
@@ -70,9 +82,6 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -88,6 +97,15 @@ under the License.
</executions>
<configuration>
<artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <destFileName>flink-connector-testing.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 234c1a0..ea6a982 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -25,10 +25,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
+import org.apache.flink.tests.util.pulsar.source.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.FailoverSubscriptionContext;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.junit.experimental.categories.Category;
@@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category;
* Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
* subscription.
*/
+@SuppressWarnings("unused")
@Category(value = {FailsOnJava11.class})
public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 5039048..15333a1 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,21 +19,26 @@
package org.apache.flink.tests.util.pulsar;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+
+import org.junit.experimental.categories.Category;
/**
* Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
* subscription.
*/
+@SuppressWarnings("unused")
+@Category(value = {FailsOnJava11.class})
public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
// Defines the Semantic.
@@ -49,12 +54,10 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S
PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
// Defines a set of external context Factories for different test cases.
- @SuppressWarnings("unused")
@TestContext
PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
- @SuppressWarnings("unused")
@TestContext
PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
deleted file mode 100644
index 5ad369b..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-
- private int index = 0;
-
- private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>();
-
- // Message keys.
- private final String key1;
- private final String key2;
-
- public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
- this(environment, Collections.emptyList());
- }
-
- public KeySharedSubscriptionContext(
- PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
- super(environment, connectorJarPaths);
-
- // Init message keys.
- this.key1 = randomAlphabetic(8);
- String newKey2;
- do {
- newKey2 = randomAlphabetic(8);
- } while (keyHash(key1) == keyHash(newKey2));
- this.key2 = newKey2;
- }
-
- @Override
- protected String displayName() {
- return "consuming message by Key_Shared";
- }
-
- @Override
- public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
- int keyHash = keyHash(key1);
- TopicRange range = new TopicRange(keyHash, keyHash);
-
- PulsarSourceBuilder<String> builder =
- PulsarSource.builder()
- .setDeserializationSchema(pulsarSchema(STRING))
- .setServiceUrl(operator.serviceUrl())
- .setAdminUrl(operator.adminUrl())
- .setTopicPattern(
- "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
- .setSubscriptionType(SubscriptionType.Key_Shared)
- .setSubscriptionName("pulsar-key-shared")
- .setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
- if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
- // Using latest stop cursor for making sure the source could be stopped.
- builder.setBoundedStopCursor(StopCursor.latest());
- }
-
- return builder.build();
- }
-
- @Override
- public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
- TestingSourceSettings sourceSettings) {
- String topicName = "pulsar-" + index + "-key-shared";
- operator.createTopic(topicName, 1);
- index++;
-
- String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
- KeyedPulsarPartitionDataWriter writer =
- new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2);
- writers.add(writer);
-
- return writer;
- }
-
- @Override
- public List<String> generateTestData(
- TestingSourceSettings sourceSettings, int splitIndex, long seed) {
- return generateStringTestData(splitIndex, seed);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return TypeInformation.of(String.class);
- }
-
- @Override
- public void close() {
- for (KeyedPulsarPartitionDataWriter writer : writers) {
- writer.close();
- }
- writers.clear();
- }
-
- private int keyHash(String key) {
- return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
- }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
deleted file mode 100644
index 1a2db66..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.cases;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-
-/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends PulsarTestContext<String> {
-
- private int index = 0;
-
- private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
-
- public SharedSubscriptionContext(PulsarTestEnvironment environment) {
- this(environment, Collections.emptyList());
- }
-
- public SharedSubscriptionContext(
- PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
- super(environment, connectorJarPaths);
- }
-
- @Override
- protected String displayName() {
- return "consuming message by Shared";
- }
-
- @Override
- public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
- PulsarSourceBuilder<String> builder =
- PulsarSource.builder()
- .setDeserializationSchema(pulsarSchema(STRING))
- .setServiceUrl(operator.serviceUrl())
- .setAdminUrl(operator.adminUrl())
- .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
- .setSubscriptionType(SubscriptionType.Shared)
- .setSubscriptionName("pulsar-shared");
- if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) {
- // Using latest stop cursor for making sure the source could be stopped.
- builder.setBoundedStopCursor(StopCursor.latest());
- }
-
- return builder.build();
- }
-
- @Override
- public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
- TestingSourceSettings sourceSettings) {
- String topicName = "pulsar-" + index + "-shared";
- operator.createTopic(topicName, 1);
- index++;
-
- String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
- PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
- writers.add(writer);
-
- return writer;
- }
-
- @Override
- public List<String> generateTestData(
- TestingSourceSettings sourceSettings, int splitIndex, long seed) {
- return generateStringTestData(splitIndex, seed);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return TypeInformation.of(String.class);
- }
-
- @Override
- public void close() {
- for (PulsarPartitionDataWriter writer : writers) {
- writer.close();
- }
- writers.clear();
- }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index 9f34554..f5e862f 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -43,7 +43,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
resourcePath("bcutil-jdk15on.jar"),
resourcePath("bcprov-ext-jdk15on.jar"),
resourcePath("jaxb-api.jar"),
- resourcePath("jul-to-slf4j.jar"));
+ resourcePath("jul-to-slf4j.jar"),
+ resourcePath("flink-connector-testing.jar"));
}
private static String resourcePath(String jarName) {
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
index e431e4c..d5f6e11 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -35,23 +35,26 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr
private final PulsarRuntimeOperator operator;
private final String fullTopicName;
- private final String key1;
- private final String key2;
+ private final String keyToRead;
+ private final String keyToExclude;
public KeyedPulsarPartitionDataWriter(
- PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) {
+ PulsarRuntimeOperator operator,
+ String fullTopicName,
+ String keyToRead,
+ String keyToExclude) {
this.operator = operator;
this.fullTopicName = fullTopicName;
- this.key1 = key1;
- this.key2 = key2;
+ this.keyToRead = keyToRead;
+ this.keyToExclude = keyToExclude;
}
@Override
public void writeRecords(List<String> records) {
- operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+ List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
- List<String> newRecords = records.stream().map(a -> a + key1).collect(toList());
- operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+ operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
+ operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
}
@Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index 01527ea..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.common;
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
-import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.TestLoggerExtension;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-
-/** A source test template for testing the messages which could be consumed in a unordered way. */
-@ExtendWith({
- ConnectorTestingExtension.class,
- TestLoggerExtension.class,
- TestCaseInvocationContextProvider.class
-})
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public abstract class UnorderedSourceTestSuiteBase<T> {
-
- @TestTemplate
- @DisplayName("Test source with one split and four consumers")
- public void testOneSplitWithMultipleConsumers(
- TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext)
- throws Exception {
- TestingSourceSettings sourceSettings =
- TestingSourceSettings.builder()
- .setBoundedness(Boundedness.BOUNDED)
- .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
- .build();
- TestEnvironmentSettings envOptions =
- TestEnvironmentSettings.builder()
- .setConnectorJarPaths(externalContext.getConnectorJarPaths())
- .build();
- List<T> testData =
- externalContext.generateTestData(
- sourceSettings, 0, ThreadLocalRandom.current().nextLong());
- ExternalSystemSplitDataWriter<T> writer =
- externalContext.createSourceSplitDataWriter(sourceSettings);
- writer.writeRecords(testData);
-
- Source<T, ?, ?> source = externalContext.createSource(sourceSettings);
- StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions);
- List<T> results =
- execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source")
- .setParallelism(4)
- .executeAndCollect(
- "Source single split with four readers.", testData.size());
-
- assertThat(results, containsInAnyOrder(testData.toArray()));
- }
-}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
similarity index 71%
rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
index 6fea0c9..4906ad6 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java
@@ -16,32 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
import org.apache.pulsar.client.api.SubscriptionType;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
/** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */
-public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
+public class ExclusiveSubscriptionContext extends MultipleTopicConsumingContext {
public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
- this(environment, Collections.emptyList());
- }
-
- public ExclusiveSubscriptionContext(
- PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
- super(environment, connectorJarPaths);
+ super(environment);
}
@Override
protected String displayName() {
- return "consuming message by Exclusive";
+ return "consume message by Exclusive";
}
@Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
similarity index 71%
copy from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
copy to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
index c473488..3134db4 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java
@@ -16,32 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
import org.apache.pulsar.client.api.SubscriptionType;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-
/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+public class FailoverSubscriptionContext extends MultipleTopicConsumingContext {
public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
- this(environment, Collections.emptyList());
- }
-
- public FailoverSubscriptionContext(
- PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
- super(environment, connectorJarPaths);
+ super(environment);
}
@Override
protected String displayName() {
- return "consuming message by Failover";
+ return "consume message by Failover";
}
@Override
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
new file mode 100644
index 0000000..0cae6e5
--- /dev/null
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.source;
+
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+
+/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
+public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
+
+ private final String keyToRead;
+ private final String keyToExclude;
+
+ public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment, Key_Shared);
+
+ this.keyToRead = randomAlphabetic(8);
+
+ // Make sure they have different hash code.
+ int readHash = keyHash(keyToRead);
+ String randomKey;
+ do {
+ randomKey = randomAlphabetic(8);
+ } while (keyHash(randomKey) == readHash);
+ this.keyToExclude = randomKey;
+ }
+
+ @Override
+ public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
+ TestingSourceSettings sourceSettings) {
+ String partitionName = generatePartitionName();
+ return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
+ }
+
+ @Override
+ protected String displayName() {
+ return "consume message by Key_Shared";
+ }
+
+ @Override
+ protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
+ int keyHash = keyHash(keyToRead);
+ TopicRange range = new TopicRange(keyHash, keyHash);
+
+ builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+ }
+
+ @Override
+ protected String subscriptionName() {
+ return "pulsar-key-shared-subscription";
+ }
+
+ // This method is copied from Pulsar for calculating message key hash.
+ private int keyHash(String key) {
+ return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+ }
+}
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
similarity index 57%
rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
index c473488..fe9f078 100644
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
@@ -16,41 +16,29 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.cases;
+package org.apache.flink.tests.util.pulsar.source;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
import org.apache.pulsar.client.api.SubscriptionType;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
+import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */
-public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
- public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
- this(environment, Collections.emptyList());
- }
-
- public FailoverSubscriptionContext(
- PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
- super(environment, connectorJarPaths);
+ public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment, Shared);
}
@Override
protected String displayName() {
- return "consuming message by Failover";
+ return "consume message by Shared";
}
@Override
protected String subscriptionName() {
- return "pulsar-failover-subscription";
- }
-
- @Override
- protected SubscriptionType subscriptionType() {
- return SubscriptionType.Failover;
+ return "pulsar-shared-subscription";
}
}