You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/01 12:40:19 UTC

[GitHub] [flink] syhily commented on a change in pull request #19092: [FLINK-26645][Connector/Pulsar] Fix Pulsar source subscriber consume from all partitions when only subscribed to 1 partition

syhily commented on a change in pull request #19092:
URL: https://github.com/apache/flink/pull/19092#discussion_r840546124



##########
File path: flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
##########
@@ -30,49 +29,111 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicListSubscriber;
 import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unit tests for {@link PulsarSubscriber}. */
 class PulsarSubscriberTest extends PulsarTestSuiteBase {
 
-    private static final String TOPIC1 = TopicNameUtils.topicName("topic1");
-    private static final String TOPIC2 = TopicNameUtils.topicName("pattern-topic");
-    private static final String TOPIC3 = TopicNameUtils.topicName("topic2");
+    private final String topic1 = topicName("topic-" + randomAlphanumeric(4) + "-1");
+    private final String topic2 = topicName("pattern-topic-" + randomAlphanumeric(4));
+    private final String topic3 = topicName("topic2-" + randomAlphanumeric(4) + "-2");
 
     private static final int NUM_PARTITIONS_PER_TOPIC = 5;
     private static final int NUM_PARALLELISM = 10;
 
     @Test
     void topicListSubscriber() {
-        operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
-        operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic1, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic2, NUM_PARTITIONS_PER_TOPIC);
 
-        PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
+        PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(topic1, topic2));
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(
                         operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM);
         Set<TopicPartition> expectedPartitions = new HashSet<>();
 
         for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
-            expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange()));
-            expectedPartitions.add(new TopicPartition(TOPIC2, i, createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic1, i, createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic2, i, createFullRange()));
         }
 
         assertEquals(expectedPartitions, topicPartitions);
 
-        operator().deleteTopic(TOPIC1);
-        operator().deleteTopic(TOPIC2);
+        operator().deleteTopic(topic1);
+        operator().deleteTopic(topic2);

Review comment:
       Yep. You are right. I'll fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org