You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 07:53:25 UTC
[pulsar] 01/03: fix the bug,
can not update topic when the update topicName is contained by an
existed topic as a part (#11686)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2acefde47e61efd84bcd381efdfb5f4b95c76265
Author: Nicklee007 <84...@users.noreply.github.com>
AuthorDate: Wed Aug 25 14:02:48 2021 +0800
fix the bug, can not update topic when the update topicName is contained by an existed topic as a part (#11686)
Fixes #11685
validatePartitionTopicUpdate use contain to check if there has a exist topic will cause conflict, which will cause a failed when exist a topic which contain the new topic's prefix and we want to update the new topic partition;
we have a those topic
"persistent://public/default/113p-partition-0"
"persistent://public/default/113p-partition-1"
"persistent://public/default/113p-partition-2"
"persistent://public/default/3p-partition-0"
when we want to update topic 3p to more partitions ,we failed because "persistent://public/default/113p-partition-0" contain "3p-partition"
Modifications
use the startwith to check if exist the same topic.
* fix the bug, the old topic contain a same strSub cause couldn't add new partitions
* add update the partitioned topic which a part is coontained in old topic test
Co-authored-by: nicklixinyang <ni...@didiglobal.com>
(cherry picked from commit 241de4b8550237458e96bf4227185243d5c8a550)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++---
.../org/apache/pulsar/broker/admin/AdminTest.java | 31 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9bbf018..f63d318 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3721,9 +3721,9 @@ public class PersistentTopicsBase extends AdminResource {
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
int oldPartition = metadata.partitions;
- String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
+ String prefix = partitionTopicName.getPartitionedTopicName() + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String exsitingTopicName : existingTopicList) {
- if (exsitingTopicName.contains(prefix)) {
+ if (exsitingTopicName.startsWith(prefix)) {
try {
long suffix = Long.parseLong(exsitingTopicName.substring(
exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
@@ -3738,7 +3738,7 @@ public class PersistentTopicsBase extends AdminResource {
clientAppId(),
exsitingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
- "Already have non partition topic" + exsitingTopicName
+ "Already have non partition topic " + exsitingTopicName
+ " which contains partition suffix '-partition-' "
+ "and end with numeric value and end with numeric value smaller than the new "
+ "number of partition. Update of partitioned topic "
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 81ec0d5..8afb61b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -103,6 +103,7 @@ import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -821,6 +822,36 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
}
+
+ @Test
+ public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception {
+
+ final String property = "prop-xyz";
+ final String cluster = "use";
+ final String namespace = "ns";
+ final String partitionedTopicName = "old-special-topic";
+ final String partitionedTopicName2 = "special-topic";
+
+ ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal, PulsarWebResource.path(POLICIES, property, cluster, namespace),
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ AsyncResponse response1 = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response1, property, cluster, namespace, partitionedTopicName, 5, false);
+ verify(response1, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ AsyncResponse response2 = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response2, property, cluster, namespace, partitionedTopicName2, 2, false);
+ verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false, 10);
+ }
+
+
static class TestAsyncResponse implements AsyncResponse {
Object response;