You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/01/31 10:11:20 UTC
(pulsar) branch branch-3.0 updated: [fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 43864019e3b [fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)
43864019e3b is described below
commit 43864019e3b8754c00960223cae3555a50774835
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Jan 31 16:19:11 2024 +0800
[fix] [broker] [branch-3.0] Fast fix infinite HTTP call createSubscriptions caused by wrong topicName (#21997)
Similar to: #20131
The master branch has fixed the issue by https://github.com/apache/pulsar/pull/19841( Since it will makes users can not receive the messages which created in mistake, we did not cherry-pick https://github.com/apache/pulsar/pull/19841 into other branches, see detail https://github.com/apache/pulsar/pull/19841)
### Motivation
#### Background of Admin API `PersistentTopics.createSubscription`
It works like this:
1. createSubscription( `tp1` )
2. is partitioned topic?
`no`: return subscriptions
`yes`: createSubscription(`tp1-partition-0`)....createSubscription(`tp1-partition-n`)
---
#### Background of the issue of `TopicName.getPartition(int index)`
```java
String partitionedTopic = "tp1-partition-0-DLQ";
TopicName partition0 = partitionedTopic.getPartition(0);// Highlight: the partition0.toString() will be "tp1-partition-0-DLQ"(it is wrong).The correct value is "tp1-partition-0-DLQ-partition-0"
```
#### Issue
Therefore, if there has a partitioned topic named `tp1-partition-0-DLQ`, the method `PersistentTopics.createSubscription` will works like this:
1. call Admin API ``PersistentTopics.createSubscription("tp1-partition-0-DLQ")`
2. is partitioned topic?
3. yes, call `TopicName.getPartition(0)` to get partition 0 and will get `tp1-partition-0-DLQ` , then loop to step-1.
Then the infinite HTTP call `PersistentTopics.createSubscription` makes the broker crash.
### Modifications
#### Quick fix(this PR does it)
If hits the issue which makes the topic name wrong, do not loop to step 1.
#### Long-term fix
The PR https://github.com/apache/pulsar/pull/19841 fixes the issue which makes the topic name wrong, and this PR will create unfriendly compatibility, and PIP 263 https://github.com/apache/pulsar/issues/20033 will make compatibility good.
---
.../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +-
.../api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java | 9 ++++++---
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2b9a34b382e..53c412bcf1d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2349,7 +2349,7 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(allowAutoTopicCreation -> getPartitionedTopicMetadataAsync(topicName,
authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
- if (numPartitions > 0) {
+ if (partitionMetadata.partitions > 0 && !isUnexpectedTopicName(partitionMetadata)) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final AtomicInteger count = new AtomicInteger(numPartitions);
final AtomicInteger failureCount = new AtomicInteger(0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
index 2efc4f4e780..96f57a59bda 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicNameForInfiniteHttpCallGetSubscriptionsTest.java
@@ -53,7 +53,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
}
@Test
- public void testInfiniteHttpCallGetSubscriptions() throws Exception {
+ public void testInfiniteHttpCallGetOrCreateSubscriptions() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String partitionedTopicName = "persistent://my-property/my-ns/tp1_" + randomStr;
final String topic_p0 = partitionedTopicName + TopicName.PARTITIONED_TOPIC_SUFFIX + "0";
@@ -65,6 +65,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
// Do test.
ProducerAndConsumerEntry pcEntry = triggerDLQCreated(topic_p0, topicDLQ, subscriptionName);
admin.topics().getSubscriptions(topicDLQ);
+ admin.topics().createSubscription(topicDLQ, "s1", MessageId.earliest);
// cleanup.
pcEntry.consumer.close();
@@ -73,7 +74,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
}
@Test
- public void testInfiniteHttpCallGetSubscriptions2() throws Exception {
+ public void testInfiniteHttpCallGetOrCreateSubscriptions2() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0-abc";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -82,13 +83,14 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
// Do test.
admin.topics().getSubscriptions(topicName);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
// cleanup.
producer.close();
}
@Test
- public void testInfiniteHttpCallGetSubscriptions3() throws Exception {
+ public void testInfiniteHttpCallGetOrCreateSubscriptions3() throws Exception {
final String randomStr = UUID.randomUUID().toString().replaceAll("-", "");
final String topicName = "persistent://my-property/my-ns/tp1_" + randomStr + "-partition-0";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
@@ -97,6 +99,7 @@ public class TopicNameForInfiniteHttpCallGetSubscriptionsTest extends ProducerCo
// Do test.
admin.topics().getSubscriptions(topicName);
+ admin.topics().createSubscription(topicName, "s1", MessageId.earliest);
// cleanup.
producer.close();