You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 07:53:25 UTC

[pulsar] 01/03: fix the bug, can not update topic when the update topicName is contained by an existed topic as a part (#11686)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2acefde47e61efd84bcd381efdfb5f4b95c76265
Author: Nicklee007 <84...@users.noreply.github.com>
AuthorDate: Wed Aug 25 14:02:48 2021 +0800

    fix the bug,  can not update topic when  the update  topicName is contained by an existed topic as a part (#11686)
    
    Fixes #11685
    
    validatePartitionTopicUpdate use contain to check if there has a exist topic will cause conflict, which will cause a failed when exist a topic which contain the new topic's prefix and we want to update the new topic partition;
    
    we have a those topic
    "persistent://public/default/113p-partition-0"
    "persistent://public/default/113p-partition-1"
    "persistent://public/default/113p-partition-2"
    "persistent://public/default/3p-partition-0"
    
    when we want to update topic 3p to more partitions ,we failed because "persistent://public/default/113p-partition-0" contain "3p-partition"
    
    Modifications
    use the startwith to check if exist the same topic.
    
    * fix the bug,  the old topic contain a same strSub cause couldn't add new partitions
    
    * add update the partitioned topic which a part is coontained in old topic test
    
    Co-authored-by: nicklixinyang <ni...@didiglobal.com>
    (cherry picked from commit 241de4b8550237458e96bf4227185243d5c8a550)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++---
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9bbf018..f63d318 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3721,9 +3721,9 @@ public class PersistentTopicsBase extends AdminResource {
         TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
         PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
         int oldPartition = metadata.partitions;
-        String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
+        String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX;
         for (String exsitingTopicName : existingTopicList) {
-            if (exsitingTopicName.contains(prefix)) {
+            if (exsitingTopicName.startsWith(prefix)) {
                 try {
                     long suffix = Long.parseLong(exsitingTopicName.substring(
                             exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
@@ -3738,7 +3738,7 @@ public class PersistentTopicsBase extends AdminResource {
                                 clientAppId(),
                                 exsitingTopicName, topicName);
                         throw new RestException(Status.PRECONDITION_FAILED,
-                                "Already have non partition topic" + exsitingTopicName
+                                "Already have non partition topic " + exsitingTopicName
                                         + " which contains partition suffix '-partition-' "
                                         + "and end with numeric value and end with numeric value smaller than the new "
                                         + "number of partition. Update of partitioned topic "
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 81ec0d5..8afb61b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -103,6 +103,7 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -821,6 +822,36 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
     }
 
+
+    @Test
+    public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception {
+
+        final String property = "prop-xyz";
+        final String cluster = "use";
+        final String namespace = "ns";
+        final String partitionedTopicName = "old-special-topic";
+        final String partitionedTopicName2 = "special-topic";
+
+        ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal, PulsarWebResource.path(POLICIES, property, cluster, namespace),
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        AsyncResponse response1 = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response1, property, cluster, namespace, partitionedTopicName, 5, false);
+        verify(response1, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response2, property, cluster, namespace, partitionedTopicName2, 2, false);
+        verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+        persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, 10);
+    }
+
+
     static class TestAsyncResponse implements AsyncResponse {
 
         Object response;