You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/18 15:01:09 UTC

[GitHub] [pulsar] AnonHxy commented on a diff in pull request #17946: [improve][broker]add the logic for creating missed subscriptions for createMissedPartitions

AnonHxy commented on code in PR #17946:
URL: https://github.com/apache/pulsar/pull/17946#discussion_r998318791


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4552,6 +4555,60 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
         return result;
     }
 
+    /**
+     * It creates subscriptions for new partitions of existing partitioned-topics.
+     *
+     * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
+     * @param numPartitions : number partitions for the topics
+     *
+     */
+    private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicName, List<String> subscriptions, int numPartitions) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        if (CollectionUtils.isEmpty(subscriptions)) {
+            result.complete(null);
+            return result;
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {

Review Comment:
   why named `e1`?  How about `e` or `ex`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4552,6 +4555,60 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
         return result;
     }
 
+    /**
+     * It creates subscriptions for new partitions of existing partitioned-topics.
+     *
+     * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
+     * @param numPartitions : number partitions for the topics
+     *
+     */
+    private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicName, List<String> subscriptions, int numPartitions) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        if (CollectionUtils.isEmpty(subscriptions)) {
+            result.complete(null);
+            return result;
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {
+            result.completeExceptionally(e1);
+            return result;
+        }
+
+        List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();
+
+        subscriptions.forEach(subscription -> {
+            for (int i = 0; i < numPartitions; i++) {
+                final String topicNamePartition = topicName.getPartition(i).toString();

Review Comment:
   It looks that we don't need this variable because it only occurs once.



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java:
##########
@@ -121,30 +124,68 @@ public Object[] restCreateMissedPartitions() {
     public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminException, PulsarClientException, MetadataStoreException {
         conf.setAllowAutoTopicCreation(false);
         final String topic = "testCreateMissedPartitions-useRestApi-" + useRestApi;
+        final String group1 = "cg_testCreateMissedPartitions-1";

Review Comment:
   `group1`-> `sub1`,  `cg_` -> `sub_`  is better in pulsar



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java:
##########
@@ -121,30 +124,68 @@ public Object[] restCreateMissedPartitions() {
     public void testCreateMissedPartitions(boolean useRestApi) throws PulsarAdminException, PulsarClientException, MetadataStoreException {
         conf.setAllowAutoTopicCreation(false);
         final String topic = "testCreateMissedPartitions-useRestApi-" + useRestApi;
+        final String group1 = "cg_testCreateMissedPartitions-1";
+        final String group2 = "cg_testCreateMissedPartitions-2";
+        final TopicName topicName = TopicName.get(topic);
         int numPartitions = 3;
         // simulate partitioned topic without partitions
         pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
                 .createPartitionedTopicAsync(TopicName.get(topic),
                 new PartitionedTopicMetadata(numPartitions));
         Consumer<byte[]> consumer = null;
         try {
-            consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribeAsync().get(3, TimeUnit.SECONDS);
+            consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(group1).subscribeAsync().get(3, TimeUnit.SECONDS);
         } catch (Exception e) {
             //ok here, consumer will create failed with 'Topic does not exist'
         }
         Assert.assertNull(consumer);
         if (useRestApi) {
             admin.topics().createMissedPartitions(topic);
         } else {
-            final TopicName topicName = TopicName.get(topic);
             for (int i = 0; i < numPartitions; i++) {
                 admin.topics().createNonPartitionedTopic(topicName.getPartition(i).toString());
             }
         }
-        consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
+        consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(group1).subscribe();
         Assert.assertNotNull(consumer);
         Assert.assertTrue(consumer instanceof MultiTopicsConsumerImpl);
-        Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), 3);
+        Assert.assertEquals(((MultiTopicsConsumerImpl)consumer).getConsumers().size(), numPartitions);
+
+
+        int newNumPartitions = 5;
+        pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                .updatePartitionedTopicAsync(TopicName.get(topic),p ->
+                        new PartitionedTopicMetadata(newNumPartitions, p.properties));

Review Comment:
   Should we sync this method?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4552,6 +4555,60 @@ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int num
         return result;
     }
 
+    /**
+     * It creates subscriptions for new partitions of existing partitioned-topics.
+     *
+     * @param topicName     : topic-name: persistent://prop/cluster/ns/topic
+     * @param numPartitions : number partitions for the topics
+     *
+     */
+    private CompletableFuture<Void> createMissedSubscriptionsAsync(TopicName topicName, List<String> subscriptions, int numPartitions) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        if (CollectionUtils.isEmpty(subscriptions)) {
+            result.complete(null);
+            return result;
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException e1) {
+            result.completeExceptionally(e1);
+            return result;
+        }
+
+        List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>();

Review Comment:
   ```suggestion
           List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>(subscriptions.size());
   ```



##########
pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java:
##########
@@ -1607,7 +1607,7 @@ public void topics() throws Exception {
         verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32, null);
 
         cmdTopics.run(split("create-missed-partitions persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1");
+        verify(mockTopics).createMissedPartitions("persistent://myprop/clust/ns1/ds1", null);

Review Comment:
   Here we need also add a verify to test `create-missed-partitions` with `-s` param



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org