You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/16 03:32:37 UTC

[GitHub] [flink] imaffe commented on a change in pull request #19099: [FLINK-26182][Connector/pulsar] Create a e2e tests for Pulsar sink.

imaffe commented on a change in pull request #19099:
URL: https://github.com/apache/flink/pull/19099#discussion_r827590927



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/sink/PulsarSinkTestContext.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.sink;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.sink.PulsarSink;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
+import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
+import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
+import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
+import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.PulsarTestCommonUtils.toDeliveryGuarantee;
+
+/** Common sink test context for pulsar based test. */
+public class PulsarSinkTestContext extends PulsarTestContext<String>
+        implements DataStreamSinkV2ExternalContext<String> {
+
+    private static final String TOPIC_NAME_PREFIX = "flink-sink-topic-";
+    private static final int RECORD_SIZE_UPPER_BOUND = 300;
+    private static final int RECORD_SIZE_LOWER_BOUND = 100;
+    private static final int RECORD_STRING_SIZE = 20;
+
+    private String topicName = topicName();
+    private final Closer closer = Closer.create();
+
+    public PulsarSinkTestContext(PulsarTestEnvironment environment) {
+        super(environment, Schema.STRING);
+    }
+
+    @Override
+    protected String displayName() {
+        return "write messages into one topic in Pulsar";
+    }
+
+    @Override
+    public Sink<String> createSink(TestingSinkSettings sinkSettings) {
+        if (!operator.topicExists(topicName)) {
+            operator.createTopic(topicName, 4);
+        }
+        DeliveryGuarantee guarantee = toDeliveryGuarantee(sinkSettings.getCheckpointingMode());
+
+        return PulsarSink.builder()
+                .setServiceUrl(operator.serviceUrl())
+                .setAdminUrl(operator.adminUrl())
+                .setTopics(topicName)
+                .setDeliveryGuarantee(guarantee)
+                .setSerializationSchema(pulsarSchema(Schema.STRING))
+                .enableSchemaEvolution()
+                .setConfig(PULSAR_BATCHING_MAX_MESSAGES, 4)
+                .build();
+    }
+
+    @Override
+    public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings sinkSettings) {
+        PulsarPartitionDataReader<String> reader =
+                sneakyClient(
+                        () -> new PulsarPartitionDataReader<>(operator, topicName, Schema.STRING));
+        closer.register(reader);
+
+        return reader;
+    }
+
+    @Override
+    public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed) {
+        Random random = new Random(seed);
+        int recordSize =
+                random.nextInt(RECORD_SIZE_UPPER_BOUND - RECORD_SIZE_LOWER_BOUND)
+                        + RECORD_SIZE_LOWER_BOUND;
+        List<String> records = new ArrayList<>(recordSize);
+        for (int i = 0; i < recordSize; i++) {
+            int size = random.nextInt(RECORD_STRING_SIZE) + RECORD_STRING_SIZE;
+            String record = "index:" + i + "-data:" + randomAlphanumeric(size);
+            records.add(record);
+        }
+
+        return records;
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return Types.STRING;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // Change the topic name after finishing a test case.

Review comment:
       I saw in PulsarTestContext close will delete the topics. Do we not delete the topics in sink ? (Because we changed the topic name)
   
   btw, I'm not very familiar with the ExternalContext lifecycle, will each test case create a new PulsarSinkTestContext (since the ITCase class uses a context factory) ?. If each test case has a new PulsarSinkTestContext, do we still need to change the topic name when we close the context ?

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.

Review comment:
       The next line has a partition number is 4.  Do we want to use a 1 partition topic here or any partition number is ok,

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
##########
@@ -40,64 +52,92 @@
 import static org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.flinkSchema;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for using PulsarSink writing to a Pulsar cluster. */
-class PulsarSinkITCase extends PulsarTestSuiteBase {
-
-    private static final int PARALLELISM = 1;
-
-    @RegisterExtension
-    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
-            new MiniClusterExtension(
-                    new MiniClusterResourceConfiguration.Builder()
-                            .setNumberTaskManagers(1)
-                            .setNumberSlotsPerTaskManager(PARALLELISM)
-                            .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                            .withHaLeadershipControl()
-                            .build());
-
-    // Using this extension for creating shared reference which would be used in source function.
-    @RegisterExtension final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
-
-    @ParameterizedTest
-    @EnumSource(DeliveryGuarantee.class)
-    void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
-        // A random topic with partition 1.
-        String topic = randomAlphabetic(8);
-        operator().createTopic(topic, 4);
-        int counts = ThreadLocalRandom.current().nextInt(100, 200);
-
-        ControlSource source =
-                new ControlSource(
-                        sharedObjects,
-                        operator(),
-                        topic,
-                        guarantee,
-                        counts,
-                        Duration.ofMillis(50),
-                        Duration.ofMinutes(5));
-        PulsarSink<String> sink =
-                PulsarSink.builder()
-                        .setServiceUrl(operator().serviceUrl())
-                        .setAdminUrl(operator().adminUrl())
-                        .setDeliveryGuarantee(guarantee)
-                        .setTopics(topic)
-                        .setSerializationSchema(flinkSchema(new SimpleStringSchema()))
-                        .build();
-
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(PARALLELISM);
-        if (guarantee != DeliveryGuarantee.NONE) {
-            env.enableCheckpointing(500L);
-        }
-        env.addSource(source).sinkTo(sink);
-        env.execute();
+/** Tests for Pulsar sink. */
+@SuppressWarnings("unused")
+class PulsarSinkITCase {
+
+    /** Integration test based on connector testing framework. */
+    @Nested
+    class IntegrationTest extends PulsarSinkTestSuiteBase {
+
+        @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
+
+        @TestExternalSystem
+        PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
+
+        @TestSemantics
+        CheckpointingMode[] semantics =
+                new CheckpointingMode[] {
+                    CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+                };
+
+        @TestContext
+        PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+                new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
+    }
+
+    /** Tests for using PulsarSink writing to a Pulsar cluster. */
+    @Nested
+    class DeliveryGuaranteeTest extends PulsarTestSuiteBase {
 
-        List<String> expectedRecords = source.getExpectedRecords();
-        List<String> consumedRecords = source.getConsumedRecords();
+        private static final int PARALLELISM = 1;
 
-        assertThat(consumedRecords)
-                .hasSameSizeAs(expectedRecords)
-                .containsExactlyInAnyOrderElementsOf(expectedRecords);
+        @RegisterExtension
+        private final MiniClusterExtension clusterExtension =
+                new MiniClusterExtension(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(PARALLELISM)
+                                .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+
+        // Using this extension for creating shared reference which would be used in source
+        // function.
+        @RegisterExtension
+        final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
+
+        @ParameterizedTest
+        @EnumSource(DeliveryGuarantee.class)
+        void writeRecordsToPulsar(DeliveryGuarantee guarantee) throws Exception {
+            // A random topic with partition 1.
+            String topic = randomAlphabetic(8);
+            operator().createTopic(topic, 4);
+            int counts = ThreadLocalRandom.current().nextInt(100, 200);

Review comment:
       nit: Generally I would avoid introducing nondeterministic in test code (unless in random testing), but I think it's ok here to use random counts.

##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
##########
@@ -361,8 +377,12 @@ public void deleteTopic(String topic) {
     public <T> Message<T> receiveMessage(String topic, Schema<T> schema, Duration timeout) {
         try {
             Consumer<T> consumer = createConsumer(topic, schema);
-            Message<T> message = consumer.receiveAsync().get(timeout.toMillis(), MILLISECONDS);
-            consumer.acknowledgeCumulative(message.getMessageId());
+            int millis = Math.toIntExact(timeout.toMillis());
+            Message<T> message = consumer.receive(millis, MILLISECONDS);

Review comment:
       Oh this looks like the same bug as the previous data loss issue. Nice to see this fixed ~ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org