You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2023/01/31 02:42:33 UTC
[flink-connector-pulsar] branch v3.0 updated: [FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/v3.0 by this push:
new b708afd [FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)
b708afd is described below
commit b708afd5f1b0b5fc41f2177b27a6aab0ac9136b4
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Tue Jan 31 10:42:27 2023 +0800
[FLINK-30657][Connector/Pulsar] Remove Shared and Key_Shared related tests in Pulsar connector. (#17)
---
.../util/pulsar/PulsarSourceUnorderedE2ECase.java | 63 -----------
.../f4d91193-72ba-4ce4-ad83-98f780dce581 | 6 -
.../pulsar/source/PulsarUnorderedSourceITCase.java | 60 ----------
.../enumerator/PulsarSourceEnumeratorTest.java | 14 +--
.../assigner/SharedSplitAssignerTest.java | 121 ---------------------
.../source/PulsarUnorderedSourceReaderTest.java | 27 -----
.../PulsarUnorderedPartitionSplitReaderTest.java | 28 -----
.../source/KeyedPulsarPartitionDataWriter.java | 66 -----------
.../source/UnorderedSourceTestSuiteBase.java | 76 -------------
.../source/cases/KeySharedSubscriptionContext.java | 85 ---------------
.../source/cases/SharedSubscriptionContext.java | 46 --------
11 files changed, 7 insertions(+), 585 deletions(-)
diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
deleted file mode 100644
index 770d118..0000000
--- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-
-import org.junit.jupiter.api.Tag;
-
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
- * subscription.
- */
-@SuppressWarnings("unused")
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
-
- // Defines the Semantic.
- @TestSemantics
- CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
- // Defines TestEnvironment.
- @TestEnv
- FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
-
- // Defines ConnectorExternalSystem.
- @TestExternalSystem
- PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
-
- // Defines a set of external context Factories for different test cases.
- @TestContext
- PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
- new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
- @TestContext
- PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
- new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 40e7dc9..37f6ba8 100644
--- a/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ b/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -10,9 +10,3 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
-org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not satisfy: only one of the following predicates match:\
-* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
-* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
-* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
- or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
\ No newline at end of file
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
deleted file mode 100644
index 8586c7d..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
-import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
-import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
-import org.apache.flink.connector.testframe.junit.annotations.TestContext;
-import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
-import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
-import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
-import org.apache.flink.streaming.api.CheckpointingMode;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Tag;
-
-/**
- * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
- */
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
-public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<String> {
-
- // Defines test environment on Flink MiniCluster
- @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
-
- // Defines pulsar running environment
- @TestExternalSystem
- PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
-
- @TestSemantics
- CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
-
- @TestContext
- PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
- new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
- @TestContext
- PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
- new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
index 702a567..0e30916 100644
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
+++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java
@@ -71,7 +71,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) throws Exception {
Set<String> preexistingTopics = setupPreexistingTopics();
try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -96,7 +96,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void startWithPeriodicPartitionDiscovery(SubscriptionType subscriptionType) throws Exception {
Set<String> preexistingTopics = setupPreexistingTopics();
try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -119,7 +119,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void discoverPartitionsTriggersAssignments(SubscriptionType subscriptionType) throws Throwable {
Set<String> preexistingTopics = setupPreexistingTopics();
try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -148,7 +148,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void discoverPartitionsPeriodically(SubscriptionType subscriptionType) throws Throwable {
String dynamicTopic = "topic3-" + randomAlphabetic(10);
Set<String> preexistingTopics = setupPreexistingTopics();
@@ -192,7 +192,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void addSplitsBack(SubscriptionType subscriptionType) throws Throwable {
Set<String> preexistingTopics = setupPreexistingTopics();
try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
@@ -225,7 +225,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void workWithPreexistingAssignments(SubscriptionType subscriptionType) throws Throwable {
Set<String> preexistingTopics = setupPreexistingTopics();
PulsarSourceEnumState preexistingAssignments;
@@ -267,7 +267,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase {
@ParameterizedTest
@EnumSource(
value = SubscriptionType.class,
- names = {"Failover", "Shared"})
+ names = {"Failover"})
void snapshotState(SubscriptionType subscriptionType) throws Throwable {
Set<String> preexistingTopics = setupPreexistingTopics();
try (MockSplitEnumeratorContext<PulsarPartitionSplit> context =
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
deleted file mode 100644
index 68f73b5..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.enumerator.assigner;
-
-import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Collections.singletonList;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/** Unit tests for {@link SharedSplitAssigner}. */
-class SharedSplitAssignerTest extends SplitAssignerTestBase {
-
- @Test
- void noMoreSplits() {
- SplitAssigner assigner = splitAssigner(true, 4);
- assertFalse(assigner.noMoreSplits(3));
-
- assigner = splitAssigner(false, 4);
- assertFalse(assigner.noMoreSplits(3));
-
- assigner.registerTopicPartitions(createPartitions("f", 8));
- assertFalse(assigner.noMoreSplits(3));
-
- assigner.createAssignment(singletonList(1));
- assertTrue(assigner.noMoreSplits(1));
- assertFalse(assigner.noMoreSplits(3));
-
- assigner.createAssignment(singletonList(3));
- assertTrue(assigner.noMoreSplits(3));
- }
-
- @Test
- void partitionsAssignment() {
- SplitAssigner assigner = splitAssigner(true, 8);
- assigner.registerTopicPartitions(createPartitions("d", 4));
- List<Integer> readers = Arrays.asList(1, 3, 5, 7);
-
- // Assignment with initial states.
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
- assigner.createAssignment(readers);
- assertThat(assignment).isPresent();
- assertThat(assignment.get().assignment()).hasSize(4);
-
- // Reassignment with same readers.
- assignment = assigner.createAssignment(readers);
- assertThat(assignment).isNotPresent();
-
- // Register new partition and assign.
- assigner.registerTopicPartitions(createPartitions("e", 5));
- assignment = assigner.createAssignment(readers);
- assertThat(assignment).isPresent();
- assertThat(assignment.get().assignment()).hasSize(4);
-
- // Assign to new readers.
- readers = Arrays.asList(0, 2, 4, 6);
- assignment = assigner.createAssignment(readers);
- assertThat(assignment).isPresent();
- assertThat(assignment.get().assignment())
- .hasSize(4)
- .allSatisfy((k, v) -> assertThat(v).hasSize(2));
- }
-
- @Test
- void reassignSplitsAfterRestarting() {
- SplitAssigner assigner = splitAssigner(true, 8);
- Set<TopicPartition> partitions = createPartitions("d", 4);
- assigner.registerTopicPartitions(partitions);
- List<Integer> readers = Arrays.asList(0, 1, 2);
-
- Optional<SplitsAssignment<PulsarPartitionSplit>> assignment =
- assigner.createAssignment(readers);
- assertThat(assignment).isPresent();
- assertThat(assignment.get().assignment()).hasSize(3);
-
- // Create a new split assigner with same state.
- SplitAssigner assigner1 = splitAssigner(true, 8, partitions);
- assigner1.registerTopicPartitions(partitions);
- assignment = assigner1.createAssignment(readers);
- assertThat(assignment).isPresent();
- assertThat(assignment.get().assignment()).hasSize(3);
- }
-
- @Override
- protected SplitAssigner createAssigner(
- StopCursor stopCursor,
- boolean enablePartitionDiscovery,
- SplitEnumeratorContext<PulsarPartitionSplit> context,
- PulsarSourceEnumState enumState) {
- return new SharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState);
- }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
deleted file mode 100644
index 4f7fdd3..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReaderTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.source;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-class PulsarUnorderedSourceReaderTest extends PulsarSourceReaderTestBase {
- @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
deleted file mode 100644
index 2cb3cb9..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReaderTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.source.reader.split;
-
-import org.apache.flink.connector.pulsar.testutils.extension.SubType;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** Unit tests for {@link PulsarUnorderedPartitionSplitReaderTest}. */
-class PulsarUnorderedPartitionSplitReaderTest extends PulsarPartitionSplitReaderTestBase {
- @SubType SubscriptionType subscriptionType = SubscriptionType.Shared;
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
deleted file mode 100644
index 23d65f0..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source;
-
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-
-import org.apache.pulsar.client.api.Schema;
-
-import java.util.List;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Source split data writer for writing test data into a Pulsar topic partition. It will write the
- * message with two keys.
- */
-public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWriter<String> {
-
- private final PulsarRuntimeOperator operator;
- private final String fullTopicName;
- private final String keyToRead;
- private final String keyToExclude;
-
- public KeyedPulsarPartitionDataWriter(
- PulsarRuntimeOperator operator,
- String fullTopicName,
- String keyToRead,
- String keyToExclude) {
- this.operator = operator;
- this.fullTopicName = fullTopicName;
- this.keyToRead = keyToRead;
- this.keyToExclude = keyToExclude;
- }
-
- @Override
- public void writeRecords(List<String> records) {
- // Send messages with the key we don't need.
- List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
- operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
-
- // Send messages with the given key.
- operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
- }
-
- @Override
- public void close() {
- // Nothing to do.
- }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
deleted file mode 100644
index af41d8c..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source;
-
-import org.apache.flink.connector.testframe.environment.TestEnvironment;
-import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
-import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.util.CloseableIterator;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Disabled;
-
-import java.util.List;
-
-import static java.util.concurrent.CompletableFuture.runAsync;
-import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
-import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-
-/**
- * Pulsar with {@link SubscriptionType#Key_Shared} and {@link SubscriptionType#Shared} consumes the
- * message out of order. So we have to override the default connector test tool.
- */
-public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBase<T> {
-
- @Override
- protected void checkResultWithSemantic(
- CloseableIterator<T> resultIterator,
- List<List<T>> testData,
- CheckpointingMode semantic,
- Integer limit) {
- Runnable runnable =
- () ->
- assertUnordered(resultIterator)
- .withNumRecordsLimit(getExpectedSize(testData, limit))
- .matchesRecordsFromSource(testData, semantic);
-
- assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
- }
-
- /**
- * Shared subscription will have multiple readers on same partition, this would make hard to
- * automatically stop like a bounded source.
- */
- private static <T> int getExpectedSize(List<List<T>> testData, Integer limit) {
- if (limit == null) {
- return testData.stream().mapToInt(List::size).sum();
- } else {
- return limit;
- }
- }
-
- @Override
- @Disabled("We don't have any idle readers in Pulsar's shared subscription.")
- public void testIdleReader(
- TestEnvironment testEnv,
- DataStreamSourceExternalContext<T> externalContext,
- CheckpointingMode semantic) {}
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
deleted file mode 100644
index c348853..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source.cases;
-
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedKeysRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDataWriter;
-import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
-
-/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
-
- private final String keyToRead;
- private final String keyToExclude;
-
- public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
- super(environment);
-
- this.keyToRead = randomAlphabetic(8);
-
- // Make sure they have different hash code.
- int readHash = keyHash(keyToRead);
- String randomKey;
- do {
- randomKey = randomAlphabetic(8);
- } while (keyHash(randomKey) == readHash);
- this.keyToExclude = randomKey;
- }
-
- @Override
- public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
- TestingSourceSettings sourceSettings) {
- String partitionName = generatePartitionName();
- return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude);
- }
-
- @Override
- protected String displayName() {
- return "consume message by Key_Shared";
- }
-
- @Override
- protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
- // Make sure we only consume the messages with keyToRead.
- FixedKeysRangeGenerator generator =
- FixedKeysRangeGenerator.builder().key(keyToRead).keySharedMode(JOIN).build();
- builder.setRangeGenerator(generator);
- builder.setConfig(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY, true);
- }
-
- @Override
- protected String subscriptionName() {
- return "pulsar-key-shared-subscription";
- }
-
- @Override
- protected SubscriptionType subscriptionType() {
- return SubscriptionType.Key_Shared;
- }
-}
diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
deleted file mode 100644
index cd649fe..0000000
--- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.source.cases;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
-public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
-
- public SharedSubscriptionContext(PulsarTestEnvironment environment) {
- super(environment);
- }
-
- @Override
- protected String displayName() {
- return "consume message by Shared";
- }
-
- @Override
- protected String subscriptionName() {
- return "pulsar-shared-subscription";
- }
-
- @Override
- protected SubscriptionType subscriptionType() {
- return SubscriptionType.Shared;
- }
-}