You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/01 09:57:18 UTC

[GitHub] [flink] AHeise commented on a change in pull request #19092: [FLINK-26645][Connector/Pulsar] Fix Pulsar source subscriber consume from all partitions when only subscribed to 1 partition

AHeise commented on a change in pull request #19092:
URL: https://github.com/apache/flink/pull/19092#discussion_r840424399



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
##########
@@ -30,49 +29,111 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicListSubscriber;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unit tests for {@link PulsarSubscriber}. */
 class PulsarSubscriberTest extends PulsarTestSuiteBase {
 
-    private static final String TOPIC1 = TopicNameUtils.topicName("topic1");
-    private static final String TOPIC2 = TopicNameUtils.topicName("pattern-topic");
-    private static final String TOPIC3 = TopicNameUtils.topicName("topic2");
+    private final String topic1 = topicName("topic-" + randomAlphanumeric(4) + "-1");
+    private final String topic2 = topicName("pattern-topic-" + randomAlphanumeric(4));
+    private final String topic3 = topicName("topic2-" + randomAlphanumeric(4) + "-2");
 
     private static final int NUM_PARTITIONS_PER_TOPIC = 5;
     private static final int NUM_PARALLELISM = 10;
 
     @Test
     void topicListSubscriber() {
-        operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
-        operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic1, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic2, NUM_PARTITIONS_PER_TOPIC);
 
-        PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
+        PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(topic1, topic2));
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(
                         operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);
         Set<TopicPartition> expectedPartitions = new HashSet<>();
 
         for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
-            expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange()));
-            expectedPartitions.add(new TopicPartition(TOPIC2, i, createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic1, i, createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic2, i, createFullRange()));
         }
 
         assertEquals(expectedPartitions, topicPartitions);
 
-        operator().deleteTopic(TOPIC1);
-        operator().deleteTopic(TOPIC2);
+        operator().deleteTopic(topic1);
+        operator().deleteTopic(topic2);

Review comment:
       these should be executed in a finally statement or else a failing test will leave the topic. 
   Alternatively, if topic creation is not taking too long, just create and delete all topics in @BeforeEach and @AfterEach. Do we even need to create them for each test? Maybe BeforeAll is sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org