You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2022/09/30 04:29:23 UTC
[flink] 06/06: [FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and sink based on the connector testing framework.
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 64c284f5f9526517459f115f7008b3dd8df9d705
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Sep 28 15:28:14 2022 +0800
[FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and sink based on the connector testing framework.
---
.../f4d91193-72ba-4ce4-ad83-98f780dce581 | 6 +
flink-connectors/flink-connector-pulsar/pom.xml | 2 -
.../connector/pulsar/sink/PulsarSinkITCase.java | 151 +++++++++++++--------
.../pulsar/source/PulsarUnorderedSourceITCase.java | 14 +-
.../pulsar/testutils/PulsarTestCommonUtils.java | 5 +-
.../pulsar/testutils/PulsarTestContext.java | 6 +-
.../testutils/runtime/PulsarRuntimeOperator.java | 15 +-
.../runtime/container/PulsarContainerRuntime.java | 25 ++--
.../testutils/runtime/mock/PulsarMockRuntime.java | 5 +-
.../testutils/sink/PulsarPartitionDataReader.java | 93 +++++++++++++
.../testutils/sink/PulsarSinkTestContext.java | 124 +++++++++++++++++
.../testutils/sink/PulsarSinkTestSuiteBase.java | 37 +++++
.../source}/KeyedPulsarPartitionDataWriter.java | 6 +-
.../source/UnorderedSourceTestSuiteBase.java | 5 +-
.../cases}/KeySharedSubscriptionContext.java | 32 ++---
.../cases/MultipleTopicConsumingContext.java | 9 +-
.../source/cases}/SharedSubscriptionContext.java | 12 +-
.../src/test/resources/docker/bootstrap.sh | 28 ----
...norderedE2ECase.java => PulsarSinkE2ECase.java} | 31 ++---
.../util/pulsar/PulsarSourceUnorderedE2ECase.java | 4 +-
.../FlinkContainerWithPulsarEnvironment.java | 5 +-
21 files changed, 434 insertions(+), 181 deletions(-)
diff --git a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581 b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 03d0ae583af..40e7dc9686c 100644
--- a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -1,3 +1,9 @@
+org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index bfba4068ee8..a9f4169a3c2 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -300,7 +300,6 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
- <include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
@@ -324,7 +323,6 @@ under the License.
<include>**/testutils/**</include>
<include>META-INF/LICENSE</include>
<include>META-INF/NOTICE</include>
- <include>containers/txnStandalone.conf</include>
</includes>
</configuration>
</execution>
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 3fccc28c706..b0ef390b728 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -20,16 +20,28 @@ package org.apache.flink.connector.pulsar.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
import org.apache.flink.connector.pulsar.testutils.function.ControlSource;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
+import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -44,63 +56,90 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for using PulsarSink writing to a Pulsar cluster. */
@Category(value = {FailsOnJava11.class})
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
- private static final int PARALLELISM = 1;
-
- @RegisterExtension
- private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
- new MiniClusterExtension(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(1)
- .setNumberSlotsPerTaskManager(PARALLELISM)
- .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .withHaLeadershipControl()
- .build());
-
- // Using this extension for creating shared reference which would be used in source function.
- @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
- @ParameterizedTest
- @EnumSource(DeliveryGuarantee.class)
- void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
- // A random topic with partition 4.
- String topic = randomAlphabetic(8);
- operator().createTopic(topic, 4);
- int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
- ControlSource source =
- new ControlSource(
- sharedObjects,
- operator(),
- topic,
- guarantee,
- counts,
- Duration.ofMillis(50),
- Duration.ofMinutes(5));
- PulsarSink<String> sink =
- PulsarSink.builder()
- .setServiceUrl(operator().serviceUrl())
- .setAdminUrl(operator().adminUrl())
- .setDeliveryGuarantee(guarantee)
- .setTopics(topic)
- .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
- .build();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.setParallelism(PARALLELISM);
- if (guarantee != DeliveryGuarantee.NONE) {
- env.enableCheckpointing(500L);
- }
- env.addSource(source).sinkTo(sink);
- env.execute();
+class PulsarSinkITCase {
+
+ /** Integration test based on connector testing framework. */
+ @Nested
+ class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+ @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+ @TestExternalSystem
+ PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+ @TestSemantics
+ CheckpointingMode[] semantics =
+ new CheckpointingMode[] {
+ CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+ };
+
+ @TestContext
+ PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+ new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+ }
+
+ /** Tests for using PulsarSink writing to a Pulsar cluster. */
+ @Nested
+ class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
- List<String> expectedRecords = source.getExpectedRecords();
- List<String> consumedRecords = source.getConsumedRecords();
+ private static final int PARALLELISM = 1;
- assertThat(consumedRecords)
- .hasSameSizeAs(expectedRecords)
- .containsExactlyInAnyOrderElementsOf(expectedRecords);
+ @RegisterExtension
+ private final MiniClusterExtension clusterExtension =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
+
+ // Using this extension for creating shared reference which would be used in source
+ // function.
+ @RegisterExtension
+ final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+ @ParameterizedTest
+ @EnumSource(DeliveryGuarantee.class)
+ void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+ // A random topic with partition 4.
+ String topic = randomAlphabetic(8);
+ operator().createTopic(topic, 4);
+ int counts = ThreadLocalRandom.current().nextInt(100, 200);
+
+ ControlSource source =
+ new ControlSource(
+ sharedObjects,
+ operator(),
+ topic,
+ guarantee,
+ counts,
+ Duration.ofMillis(50),
+ Duration.ofMinutes(5));
+ PulsarSink<String> sink =
+ PulsarSink.builder()
+ .setServiceUrl(operator().serviceUrl())
+ .setAdminUrl(operator().adminUrl())
+ .setDeliveryGuarantee(guarantee)
+ .setTopics(topic)
+ .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(PARALLELISM);
+ if (guarantee != DeliveryGuarantee.NONE) {
+ env.enableCheckpointing(500L);
+ }
+ env.addSource(source).sinkTo(sink);
+ env.execute();
+
+ List<String> expectedRecords = source.getExpectedRecords();
+ List<String> consumedRecords = source.getConsumedRecords();
+
+ assertThat(consumedRecords)
+ .hasSameSizeAs(expectedRecords)
+ .containsExactlyInAnyOrderElementsOf(expectedRecords);
+ }
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
index d18c6c2633b..070a16126a7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
@@ -22,7 +22,8 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
@@ -34,8 +35,6 @@ import org.apache.flink.testutils.junit.FailsOnJava11;
import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.experimental.categories.Category;
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
/**
* Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Shared} subscription.
*/
@@ -53,7 +52,10 @@ public class PulsarUnorderedSourceITCase extends UnorderedSourceTestSuiteBase<St
CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
@TestContext
- PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic =
- new PulsarTestContextFactory<>(
- pulsar, env -> new MultipleTopicConsumingContext(env, Shared));
+ PulsarTestContextFactory<String, SharedSubscriptionContext> sharedSubscription =
+ new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
+
+ @TestContext
+ PulsarTestContextFactory<String, KeySharedSubscriptionContext> keySharedSubscription =
+ new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
index 96b1ca62f2a..994601329d6 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestCommonUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.streaming.api.CheckpointingMode;
@@ -59,9 +58,7 @@ public class PulsarTestCommonUtils {
public static PulsarPartitionSplit createPartitionSplit(
String topic, int partitionId, Boundedness boundedness, MessageId latestConsumedId) {
- TopicPartition topicPartition =
- new TopicPartition(topic, partitionId, TopicRange.createFullRange());
-
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
StopCursor stopCursor =
boundedness == Boundedness.BOUNDED ? StopCursor.latest() : StopCursor.never();
return new PulsarPartitionSplit(topicPartition, stopCursor, latestConsumedId, null);
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index a4e44441bfd..1408656f986 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -52,10 +52,12 @@ public abstract class PulsarTestContext<T> implements ExternalContext {
@Override
public List<URL> getConnectorJarPaths() {
- // We don't need any tests jar definition. They are provided in docker-related environments.
+ // We don't need any test jars definition. They are provided in docker-related environments.
return Collections.emptyList();
}
@Override
- public void close() throws Exception {}
+ public void close() throws Exception {
+ // All the topics would be deleted in the PulsarRuntime. No need to manually close them.
+ }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 9787a8ceeb9..5d14d7ee081 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
@@ -48,7 +47,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import java.io.Closeable;
-import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -228,12 +226,7 @@ public class PulsarRuntimeOperator implements Closeable {
public List<TopicPartition> topicInfo(String topic) {
try {
return client().getPartitionsForTopic(topic).get().stream()
- .map(
- p ->
- new TopicPartition(
- topic,
- TopicName.getPartitionIndex(p),
- TopicRange.createFullRange()))
+ .map(p -> new TopicPartition(topic, TopicName.getPartitionIndex(p)))
.collect(toList());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
@@ -452,7 +445,7 @@ public class PulsarRuntimeOperator implements Closeable {
/** This method is used for test framework. You can't close this operator manually. */
@Override
- public void close() throws IOException {
+ public void close() throws PulsarClientException {
if (admin != null) {
admin.close();
}
@@ -485,7 +478,7 @@ public class PulsarRuntimeOperator implements Closeable {
}
}
- private synchronized <T> Producer<T> createProducer(String topic, Schema<T> schema) {
+ private <T> Producer<T> createProducer(String topic, Schema<T> schema) {
ProducerBuilder<T> builder =
client().newProducer(schema)
.topic(topic)
@@ -496,7 +489,7 @@ public class PulsarRuntimeOperator implements Closeable {
return sneakyClient(builder::create);
}
- private synchronized <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
+ private <T> Consumer<T> createConsumer(String topic, Schema<T> schema) {
// Create the earliest subscription if it's not existed.
List<String> subscriptions = sneakyAdmin(() -> admin().topics().getSubscriptions(topic));
if (!subscriptions.contains(SUBSCRIPTION_NAME)) {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 910985338b9..eac42f063bc 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -24,7 +24,6 @@ import org.apache.flink.util.DockerImageVersions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -88,22 +87,30 @@ public class PulsarContainerRuntime implements PulsarRuntime {
return;
}
- // Override the default configuration in container for enabling the Pulsar transaction.
- container.withClasspathResourceMapping(
- "docker/bootstrap.sh", "/pulsar/bin/bootstrap.sh", BindMode.READ_ONLY);
- // Waiting for the Pulsar border is ready.
+ // Override the default standalone configuration by system environments.
+ container.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
+ container.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true");
+ container.withEnv("PULSAR_PREFIX_systemTopicEnabled", "true");
+ container.withEnv("PULSAR_PREFIX_brokerDeduplicationEnabled", "true");
+ container.withEnv("PULSAR_PREFIX_defaultNumberOfNamespaceBundles", "1");
+ // Change the default bootstrap script, it will override the default configuration
+ // and start a standalone Pulsar without streaming storage and function worker.
+ container.withCommand(
+ "sh",
+ "-c",
+ "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone --no-functions-worker -nss");
+ // Waiting for the Pulsar broker and the transaction is ready after the container started.
container.waitingFor(
- forHttp("/admin/v2/namespaces/public/default")
+ forHttp(
+ "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions")
.forPort(BROKER_HTTP_PORT)
.forStatusCode(200)
.withStartupTimeout(Duration.ofMinutes(5)));
- // Set custom startup script.
- container.withCommand("sh /pulsar/bin/bootstrap.sh");
// Start the Pulsar Container.
container.start();
// Append the output to this runtime logger. Used for local debug purpose.
- container.followOutput(new Slf4jLogConsumer(LOG).withSeparateOutputStreams());
+ container.followOutput(new Slf4jLogConsumer(LOG, true));
// Create the operator.
if (boundFlink) {
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index 326c3ff6a89..b0df4c5d1ff 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -64,10 +64,10 @@ public class PulsarMockRuntime implements PulsarRuntime {
@Override
public void tearDown() {
try {
- pulsarService.close();
if (operator != null) {
operator.close();
}
+ pulsarService.close();
} catch (Exception e) {
throw new IllegalStateException(e);
}
@@ -95,7 +95,7 @@ public class PulsarMockRuntime implements PulsarRuntime {
configuration.setAuthorizationEnabled(false);
configuration.setAllowAutoTopicCreation(true);
configuration.setBrokerDeleteInactiveTopicsEnabled(false);
-
+ configuration.setTopicLevelPoliciesEnabled(true);
configuration.setWebSocketServiceEnabled(false);
// Use runtime dynamic ports
configuration.setBrokerServicePort(Optional.of(0));
@@ -107,6 +107,7 @@ public class PulsarMockRuntime implements PulsarRuntime {
configuration.setTransactionCoordinatorEnabled(true);
configuration.setTransactionMetadataStoreProviderClassName(
"org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider");
+ configuration.setDefaultNumberOfNamespaceBundles(1);
return configuration;
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
new file mode 100644
index 00000000000..6ef9510d116
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarPartitionDataReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+
+/** The data reader for a specified topic partition from Pulsar. */
+public class PulsarPartitionDataReader<T> implements ExternalSystemDataReader<T>, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarPartitionDataReader.class);
+
+ private final Consumer<T> consumer;
+
+ public PulsarPartitionDataReader(
+ PulsarRuntimeOperator operator, String fullTopicName, Schema<T> schema)
+ throws PulsarClientException {
+ // Create client for supporting the use in E2E test.
+ String subscriptionName = randomAlphanumeric(12);
+ this.consumer =
+ operator.client()
+ .newConsumer(schema)
+ .topic(fullTopicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionMode(SubscriptionMode.Durable)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ }
+
+ @Override
+ public List<T> poll(Duration timeout) {
+ List<T> results = new ArrayList<>();
+
+ while (true) {
+ try {
+ int millis = Math.toIntExact(timeout.toMillis());
+ Message<T> message = consumer.receive(millis, MILLISECONDS);
+
+ if (message != null) {
+ consumer.acknowledgeCumulative(message);
+ results.add(message.getValue());
+ } else {
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error("", e);
+ break;
+ }
+ }
+
+ return results;
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ consumer.close();
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
new file mode 100644
index 00000000000..6b9cf2d1041
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
+
+/** Common sink test context for pulsar based test. */
+public class PulsarSinkTestContext extends PulsarTestContext<String>
+ implements DataStreamSinkV2ExternalContext<String> {
+
+ private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
+ private static final int RECORD_SIZE_UPPER_BOUND = 300;
+ private static final int RECORD_SIZE_LOWER_BOUND = 100;
+ private static final int RECORD_STRING_SIZE = 20;
+
+ private String topicName = topicName();
+ private final Closer closer = Closer.create();
+
+ public PulsarSinkTestContext(PulsarTestEnvironment environment) {
+ super(environment, Schema.STRING);
+ }
+
+ @Override
+ protected String displayName() {
+ return "write messages into one topic in Pulsar";
+ }
+
+ @Override
+ public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+ operator.createTopic(topicName, 4);
+ DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
+
+ return PulsarSink.builder()
+ .setServiceUrl(operator.serviceUrl())
+ .setAdminUrl(operator.adminUrl())
+ .setTopics(topicName)
+ .setDeliveryGuarantee(guarantee)
+ .setSerializationSchema(pulsarSchema(schema))
+ .enableSchemaEvolution()
+ .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
+ .build();
+ }
+
+ @Override
+ public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+ PulsarPartitionDataReader<String> reader =
+ sneakyClient(
+ () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+ closer.register(reader);
+
+ return reader;
+ }
+
+ @Override
+ public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+ Random random = new Random(seed);
+ int recordSize =
+ random.nextInt(RECORD_SIZE_UPPER_BOUND - RECORD_SIZE_LOWER_BOUND)
+ + RECORD_SIZE_LOWER_BOUND;
+ List<String> records = new ArrayList<>(recordSize);
+ for (int i = 0; i < recordSize; i++) {
+ int size = random.nextInt(RECORD_STRING_SIZE) + RECORD_STRING_SIZE;
+ String record = "index:" + i + "-data:" + randomAlphanumeric(size);
+ records.add(record);
+ }
+
+ return records;
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return Types.STRING;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Change the topic name after finishing a test case.
+ closer.register(() -> topicName = topicName());
+ closer.close();
+ }
+
+ private String topicName() {
+ return TOPIC_NAME_PREFIX + randomAlphanumeric(8);
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
new file mode 100644
index 00000000000..0695a43569b
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestSuiteBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext;
+import org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.junit.jupiter.api.Disabled;
+
+/** Pulsar sink don't expose the monitor metrics now. We have to disable this test. */
+public abstract class PulsarSinkTestSuiteBase extends SinkTestSuiteBase<String> {
+
+ @Override
+ @Disabled("Enable this test after FLINK-26027 being merged.")
+ public void testMetrics(
+ TestEnvironment testEnv,
+ DataStreamSinkExternalContext<String> externalContext,
+ CheckpointingMode semantic) {}
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
similarity index 93%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
index d5f6e11c00a..23d65f04a81 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/KeyedPulsarPartitionDataWriter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.common;
+package org.apache.flink.connector.pulsar.testutils.source;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
@@ -51,9 +51,11 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr
@Override
public void writeRecords(List<String> records) {
+ // Send messages with the key we don't need.
List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList());
-
operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords);
+
+ // Send messages with the given key.
operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records);
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
index 35496ccab21..af41d8c3093 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/UnorderedSourceTestSuiteBase.java
@@ -72,8 +72,5 @@ public abstract class UnorderedSourceTestSuiteBase<T> extends SourceTestSuiteBas
public void testIdleReader(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
- CheckpointingMode semantic)
- throws Exception {
- super.testIdleReader(testEnv, externalContext, semantic);
- }
+ CheckpointingMode semantic) {}
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
similarity index 73%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
index 0cae6e58405..c348853872d 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/KeySharedSubscriptionContext.java
@@ -16,24 +16,21 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.source;
+package org.apache.flink.connector.pulsar.testutils.source.cases;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedKeysRangeGenerator;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.source.KeyedPulsarPartitionDataWriter;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator.KeySharedMode.JOIN;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.range.TopicRangeUtils.keyHash;
/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */
public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext {
@@ -42,7 +39,7 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
private final String keyToExclude;
public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
- super(environment, Key_Shared);
+ super(environment);
this.keyToRead = randomAlphabetic(8);
@@ -69,10 +66,11 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
@Override
protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
- int keyHash = keyHash(keyToRead);
- TopicRange range = new TopicRange(keyHash, keyHash);
-
- builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+ // Make sure we only consume the messages with keyToRead.
+ FixedKeysRangeGenerator generator =
+ FixedKeysRangeGenerator.builder().key(keyToRead).keySharedMode(JOIN).build();
+ builder.setRangeGenerator(generator);
+ builder.setConfig(PULSAR_ALLOW_KEY_SHARED_OUT_OF_ORDER_DELIVERY, true);
}
@Override
@@ -80,8 +78,8 @@ public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext
return "pulsar-key-shared-subscription";
}
- // This method is copied from Pulsar for calculating message key hash.
- private int keyHash(String key) {
- return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+ @Override
+ protected SubscriptionType subscriptionType() {
+ return SubscriptionType.Key_Shared;
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
index ca6410fbc43..422c954a4e7 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/MultipleTopicConsumingContext.java
@@ -33,18 +33,11 @@ import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNam
public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
private final String topicPrefix = "flink-multiple-topic-" + randomAlphabetic(8) + "-";
- private final SubscriptionType subscriptionType;
private int index = 0;
public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
- this(environment, SubscriptionType.Exclusive);
- }
-
- public MultipleTopicConsumingContext(
- PulsarTestEnvironment environment, SubscriptionType subscriptionType) {
super(environment);
- this.subscriptionType = subscriptionType;
}
@Override
@@ -64,7 +57,7 @@ public class MultipleTopicConsumingContext extends PulsarSourceTestContext {
@Override
protected SubscriptionType subscriptionType() {
- return subscriptionType;
+ return SubscriptionType.Exclusive;
}
@Override
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
similarity index 84%
rename from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
index fe9f07832af..cd649fef040 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/cases/SharedSubscriptionContext.java
@@ -16,20 +16,17 @@
* limitations under the License.
*/
-package org.apache.flink.tests.util.pulsar.source;
+package org.apache.flink.connector.pulsar.testutils.source.cases;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
import org.apache.pulsar.client.api.SubscriptionType;
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */
public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
public SharedSubscriptionContext(PulsarTestEnvironment environment) {
- super(environment, Shared);
+ super(environment);
}
@Override
@@ -41,4 +38,9 @@ public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
protected String subscriptionName() {
return "pulsar-shared-subscription";
}
+
+ @Override
+ protected SubscriptionType subscriptionType() {
+ return SubscriptionType.Shared;
+ }
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh b/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh
deleted file mode 100755
index fa4e2921a75..00000000000
--- a/flink-connectors/flink-connector-pulsar/src/test/resources/docker/bootstrap.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/usr/bin/env bash
-# ----------------------------------------------------------------------------
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-# ----------------------------------------------------------------------------
-
-# Enable the transaction in standalone config.
-sed -i 's/transactionCoordinatorEnabled=false/transactionCoordinatorEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/acknowledgmentAtBatchIndexLevelEnabled=false/acknowledgmentAtBatchIndexLevelEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/systemTopicEnabled=false/systemTopicEnabled=true/g' /pulsar/conf/standalone.conf
-sed -i 's/brokerDeduplicationEnabled=false/brokerDeduplicationEnabled=true/g' /pulsar/conf/standalone.conf
-
-# Start Pulsar standalone without function worker and streaming storage.
-/pulsar/bin/pulsar standalone --no-functions-worker -nss
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
similarity index 64%
copy from flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index 15333a1d538..a19f5931b58 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -19,7 +19,8 @@
package org.apache.flink.tests.util.pulsar;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,38 +28,30 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.junit.experimental.categories.Category;
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
- * subscription.
- */
+/** Pulsar sink E2E test based on connector testing framework. */
@SuppressWarnings("unused")
@Category(value = {FailsOnJava11.class})
-public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
+public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
- // Defines the Semantic.
@TestSemantics
- CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE};
+ CheckpointingMode[] semantics =
+ new CheckpointingMode[] {
+ CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+ };
- // Defines TestEnvironment.
+ // Defines TestEnvironment
@TestEnv
- FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
+ FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6);
// Defines ConnectorExternalSystem.
@TestExternalSystem
PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink);
- // Defines a set of external context Factories for different test cases.
- @TestContext
- PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
- new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
-
@TestContext
- PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
- new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
+ PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+ new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 15333a1d538..89692d1c449 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -20,6 +20,8 @@ package org.apache.flink.tests.util.pulsar;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
+import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,8 +29,6 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
import org.apache.flink.testutils.junit.FailsOnJava11;
import org.junit.experimental.categories.Category;
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index f5e862f17c2..65e99a81456 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -51,11 +51,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir
return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
}
- protected static Configuration flinkConfiguration() {
+ private static Configuration flinkConfiguration() {
Configuration configuration = new Configuration();
- // Increase the off heap memory of TaskManager to avoid direct buffer memory error in Pulsar
- // e2e tests.
- configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100));
// Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048));