You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/01/31 10:11:20 UTC

(pulsar) branch branch-3.0 updated: [fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 43864019e3b [fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)
43864019e3b is described below

commit 43864019e3b8754c00960223cae3555a50774835
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Jan 31 16:19:11 2024 +0800

    [fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)
    
    Similar to: #20131
    
    The master branch has fixed the issue by https://github.com/apache/pulsar/pull/19841( Since it will makes users can not receive the messages which created in mistake, we did not cherry-pick https://github.com/apache/pulsar/pull/19841 into other branches, see detail https://github.com/apache/pulsar/pull/19841)
    
    ### Motivation
    
    #### Background of Admin API `PersistentTopics.createSubscription`
    It works like this:
    1. createSubscription( `tp1` )
    2. is partitioned topic?
      `no`: return subscriptions
      `yes`: createSubscription(`tp1-partition-0`)....createSubscription(`tp1-partition-n`)
    
    ---
    
    #### Background of the issue of `TopicName.getPartition(int index)`
    ```java
    String partitionedTopic = "tp1-partition-0-DLQ";
    
    TopicName partition0 = partitionedTopic.getPartition(0);// Highlight: the partition0.toString() will be "tp1-partition-0-DLQ"(it is wrong).The correct value is "tp1-partition-0-DLQ-partition-0"
    ```
    
    
    #### Issue
    Therefore, if there has a partitioned topic named `tp1-partition-0-DLQ`, the method `PersistentTopics.createSubscription` will works like this:
    1. call Admin API ``PersistentTopics.createSubscription("tp1-partition-0-DLQ")`
    2. is partitioned topic?
    3. yes, call `TopicName.getPartition(0)` to get partition 0 and will get `tp1-partition-0-DLQ` , then loop to step-1.
    
    Then the infinite HTTP call `PersistentTopics.createSubscription` makes the broker crash.
    
    ### Modifications
    
    #### Quick fix(this PR does it)
    If hits the issue which makes the topic name wrong, do not loop to step 1.
    
    #### Long-term fix
    The PR https://github.com/apache/pulsar/pull/19841 fixes the issue which makes the topic name wrong, and this PR will create unfriendly compatibility, and PIP 263 https://github.com/apache/pulsar/issues/20033 will make compatibility good.
---
 .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java    | 2 +-
 .../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java    | 9 ++++++---
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2b9a34b382e..53c412bcf1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2349,7 +2349,7 @@ public class PersistentTopicsBase extends AdminResource {
                         .thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
                                 authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
                             final int numPartitions = partitionMetadata.partitions;
-                            if (numPartitions > 0) {
+                            if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) {
                                 final CompletableFuture<Void> future = new CompletableFuture<>();
                                 final AtomicInteger count = new AtomicInteger(numPartitions);
                                 final AtomicInteger failureCount = new AtomicInteger(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
index 2efc4f4e780..96f57a59bda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -53,7 +53,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
     }
 
     @Test
-    public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+    public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception {
         final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
         final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr;
         final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
@@ -65,6 +65,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
         // Do test.
         ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName);
         admin.topics().getSubscriptions(topicDLQ);
+        admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest);
 
         // cleanup.
         pcEntry.consumer.close();
@@ -73,7 +74,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
     }
 
     @Test
-    public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+    public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception {
         final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
         final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc";
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -82,13 +83,14 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
 
         // Do test.
         admin.topics().getSubscriptions(topicName);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
 
         // cleanup.
         producer.close();
     }
 
     @Test
-    public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+    public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception {
         final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
         final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0";
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -97,6 +99,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
 
         // Do test.
         admin.topics().getSubscriptions(topicName);
+        admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
 
         // cleanup.
         producer.close();