You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/30 14:49:24 UTC

[GitHub] [flink] syhily commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

syhily commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r838637617



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       @zentol This random count was logged in `ControlSource`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org