You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/15 13:37:07 UTC

[GitHub] [flink] syhily opened a new pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

syhily opened a new pull request #19099:
URL: https://github.com/apache/flink/pull/19099


   ## What is the purpose of the change
   
   Add e2e tests for existing Pulsar sink connector. The tests is based on new connector testing framework.
   
   ## Brief change log
   
     - Extra the common e2e test code for Pulsar source and sink.
     - Refactor the Pulsar source test tools.
     - Fix some typo in comments of Pulsar connector code.
     - Add new e2e test for Pulsar sink.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Add tests for both Pulsar source or sink on the testing framework which contains a lot of tests case.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduces a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 361e719b3e2e37c27f54869ba77314c58c1c8bb8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129) 
   * 8f66a1ccecb317f8342dec401e462a31acfe8728 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7a5a653dbb0ea3196878c51b272b1aa5631b635 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34049",
       "triggerID" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0e6ba688dc8c46ad83139d5e6d9c5756763186e0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34049) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r829046604



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       If you use random parameters you must also make sure to log them somewhere to make them reproducible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7241901a3516ab2a100c4b8a10b73b1911d01986 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114) 
   * 361e719b3e2e37c27f54869ba77314c58c1c8bb8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34049",
       "triggerID" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7a5a653dbb0ea3196878c51b272b1aa5631b635 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015) 
   * 0e6ba688dc8c46ad83139d5e6d9c5756763186e0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34049) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 361e719b3e2e37c27f54869ba77314c58c1c8bb8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129) 
   * 8f66a1ccecb317f8342dec401e462a31acfe8728 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "40ea40365b2591015a2e5face5b3a48df1e615a4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "40ea40365b2591015a2e5face5b3a48df1e615a4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 40ea40365b2591015a2e5face5b3a48df1e615a4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7241901a3516ab2a100c4b8a10b73b1911d01986 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114) 
   * 361e719b3e2e37c27f54869ba77314c58c1c8bb8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f66a1ccecb317f8342dec401e462a31acfe8728 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 361e719b3e2e37c27f54869ba77314c58c1c8bb8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zentol commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
zentol commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r829046604



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       If you use random parameters you must also make sure to log them somewhere to make them reproducible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] syhily commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
syhily commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r838820397



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
+
+/** Common sink test context for pulsar based test. */
+public class PulsarSinkTestContext extends PulsarTestContext<String>
+        implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
+    private static final int RECORD_SIZE_UPPER_BOUND = 300;
+    private static final int RECORD_SIZE_LOWER_BOUND = 100;
+    private static final int RECORD_STRING_SIZE = 20;
+
+    private String topicName = topicName();
+    private final Closer closer = Closer.create();
+
+    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
+        super(environment, Schema.STRING);
+    }
+
+    @Override
+    protected String displayName() {
+        return "write messages into one topic in Pulsar";
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        if (!operator.topicExists(topicName)) {
+            operator.createTopic(topicName, 4);
+        }
+        DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
+
+        return PulsarSink.builder()
+                .setServiceUrl(operator.serviceUrl())
+                .setAdminUrl(operator.adminUrl())
+                .setTopics(topicName)
+                .setDeliveryGuarantee(guarantee)
+                .setSerializationSchema(pulsarSchema(Schema.STRING))
+                .enableSchemaEvolution()
+                .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        PulsarPartitionDataReader<String> reader =
+                sneakyClient(
+                        () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+        closer.register(reader);
+
+        return reader;
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordSize =
+                random.nextInt(RECORD_SIZE_UPPER_BOUND - RECORD_SIZE_LOWER_BOUND)
+                        + RECORD_SIZE_LOWER_BOUND;
+        List<String> records = new ArrayList<>(recordSize);
+        for (int i = 0; i < recordSize; i++) {
+            int size = random.nextInt(RECORD_STRING_SIZE) + RECORD_STRING_SIZE;
+            String record = "index:" + i + "-data:" + randomAlphanumeric(size);
+            records.add(record);
+        }
+
+        return records;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return Types.STRING;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Change the topic name after finishing a test case.

Review comment:
       This is preserved for the boring test logic in sink testing tools. I can't remove topics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f66a1ccecb317f8342dec401e462a31acfe8728 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986) 
   * a032fb606c2bb05580a7a1a7806c7a212ff5e1d3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f66a1ccecb317f8342dec401e462a31acfe8728 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986) 
   * a032fb606c2bb05580a7a1a7806c7a212ff5e1d3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7241901a3516ab2a100c4b8a10b73b1911d01986 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7241901a3516ab2a100c4b8a10b73b1911d01986 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] imaffe commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
imaffe commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r827590927



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
+
+/** Common sink test context for pulsar based test. */
+public class PulsarSinkTestContext extends PulsarTestContext<String>
+        implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
+    private static final int RECORD_SIZE_UPPER_BOUND = 300;
+    private static final int RECORD_SIZE_LOWER_BOUND = 100;
+    private static final int RECORD_STRING_SIZE = 20;
+
+    private String topicName = topicName();
+    private final Closer closer = Closer.create();
+
+    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
+        super(environment, Schema.STRING);
+    }
+
+    @Override
+    protected String displayName() {
+        return "write messages into one topic in Pulsar";
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        if (!operator.topicExists(topicName)) {
+            operator.createTopic(topicName, 4);
+        }
+        DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
+
+        return PulsarSink.builder()
+                .setServiceUrl(operator.serviceUrl())
+                .setAdminUrl(operator.adminUrl())
+                .setTopics(topicName)
+                .setDeliveryGuarantee(guarantee)
+                .setSerializationSchema(pulsarSchema(Schema.STRING))
+                .enableSchemaEvolution()
+                .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        PulsarPartitionDataReader<String> reader =
+                sneakyClient(
+                        () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+        closer.register(reader);
+
+        return reader;
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordSize =
+                random.nextInt(RECORD_SIZE_UPPER_BOUND - RECORD_SIZE_LOWER_BOUND)
+                        + RECORD_SIZE_LOWER_BOUND;
+        List<String> records = new ArrayList<>(recordSize);
+        for (int i = 0; i < recordSize; i++) {
+            int size = random.nextInt(RECORD_STRING_SIZE) + RECORD_STRING_SIZE;
+            String record = "index:" + i + "-data:" + randomAlphanumeric(size);
+            records.add(record);
+        }
+
+        return records;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return Types.STRING;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Change the topic name after finishing a test case.

Review comment:
       I saw in PulsarTestContext close will delete the topics. Do we not delete the topics in sink ? (Because we changed the topic name)
   
   btw, I'm not very familiar with the ExternalContext lifecycle, will each test case create a new PulsarSinkTestContext (since the ITCase class uses a context factory) ?. If each test case has a new PulsarSinkTestContext, do we still need to change the topic name when we close the context ?

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.

Review comment:
       The next line has a partition number is 4.  Do we want to use a 1 partition topic here or any partition number is ok,

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       nit: Generally I would avoid introducing nondeterministic in test code (unless in random testing), but I think it's ok here to use random counts.

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
##########
@@ -361,8 +377,12 @@ public void deleteTopic(String topic) {
     public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
         try {
             Consumer<T> consumer = createConsumer(topic, schema);
-            Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
-            consumer.acknowledgeCumulative(message.getMessageId());
+            int millis = Math.toIntExact(timeout.toMillis());
+            Message<T> message = consumer.receive(millis, MILLISECONDS);

Review comment:
       Oh this looks like the same bug as the previous data loss issue. Nice to see this fixed ~ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a032fb606c2bb05580a7a1a7806c7a212ff5e1d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994) 
   * f7a5a653dbb0ea3196878c51b272b1aa5631b635 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] syhily commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
syhily commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r838637617



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       @zentol This random count was logged in `ControlSource`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a032fb606c2bb05580a7a1a7806c7a212ff5e1d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994) 
   * f7a5a653dbb0ea3196878c51b272b1aa5631b635 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     }, {
       "hash" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33129",
       "triggerID" : "361e719b3e2e37c27f54869ba77314c58c1c8bb8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33986",
       "triggerID" : "8f66a1ccecb317f8342dec401e462a31acfe8728",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33994",
       "triggerID" : "a032fb606c2bb05580a7a1a7806c7a212ff5e1d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015",
       "triggerID" : "f7a5a653dbb0ea3196878c51b272b1aa5631b635",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0e6ba688dc8c46ad83139d5e6d9c5756763186e0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7a5a653dbb0ea3196878c51b272b1aa5631b635 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=34015) 
   * 0e6ba688dc8c46ad83139d5e6d9c5756763186e0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] syhily commented on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
syhily commented on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1083444216


   @zentol All the test failures should be fixed. Can you help me review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #19099:
URL: https://github.com/apache/flink/pull/19099#issuecomment-1068063059


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114",
       "triggerID" : "7241901a3516ab2a100c4b8a10b73b1911d01986",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7241901a3516ab2a100c4b8a10b73b1911d01986 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=33114) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org