You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/24 08:11:39 UTC

[pulsar] branch master updated: [broker][fix]Fix update topic remove properties (#17231)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de80e0684e [broker][fix]Fix update topic remove properties (#17231)
1de80e0684e is described below

commit 1de80e0684ec5c13b6edcd217af62d74d8677f04
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Aug 24 16:11:30 2022 +0800

    [broker][fix]Fix update topic remove properties (#17231)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  6 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 42 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d9adc4ac20b..231168e35e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -481,7 +481,8 @@ public class PersistentTopicsBase extends AdminResource {
                                     return updatePartitionInOtherCluster(numPartitions, clusters)
                                         .thenCompose(v -> namespaceResources().getPartitionedTopicResources()
                                                         .updatePartitionedTopicAsync(topicName, p ->
-                                                                new PartitionedTopicMetadata(numPartitions)
+                                                                new PartitionedTopicMetadata(numPartitions,
+                                                                    p.properties)
                                                         ));
                                 } else {
                                     return CompletableFuture.completedFuture(null);
@@ -4361,7 +4362,8 @@ public class PersistentTopicsBase extends AdminResource {
         CompletableFuture<Void> result = new CompletableFuture<>();
         createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
             CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
-                    .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+                    .updatePartitionedTopicAsync(topicName, p ->
+                        new PartitionedTopicMetadata(numPartitions, p.properties));
             future.exceptionally(ex -> {
                 // If the update operation fails, clean up the partitions that were created
                 getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index cabee91f36d..a49fb85479a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -616,6 +616,48 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
 
+    @Test
+    public void testUpdatePartitionedTopicHavingProperties() throws Exception {
+        final String tenant = "tenant-testUpdatePartitionedTopicHavingProperties";
+        final String namespace = "ns-testUpdatePartitionedTopicHavingProperties";
+        final String topic = "topic-testUpdatePartitionedTopicHavingProperties";
+        Map<String, String> topicMetadata = new HashMap<>();
+        topicMetadata.put("key1", "value1");
+
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+        admin.tenants().createTenant(tenant, tenantInfo);
+        admin.namespaces().createNamespace(tenant + "/" + namespace, Set.of("test"));
+
+        // create a 2 partition topic with properties key1->value1
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
+            ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
+        persistentTopics.createPartitionedTopic(response, tenant, namespace, topic, metadata, true);
+        Awaitility.await().untilAsserted(() -> {
+            persistentTopics.getPartitionedMetadata(response,
+                tenant, namespace, topic, true, false);
+            verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+            Assert.assertEquals(responseCaptor.getValue().properties.size(), 1);
+            Assert.assertEquals(responseCaptor.getValue().properties, topicMetadata);
+        });
+
+        // update partition to 5
+        final int updatedPartition = 5;
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
+            ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.updatePartitionedTopic(response2, tenant, namespace, topic, false, false, false, updatedPartition);
+        Awaitility.await().untilAsserted(() -> {
+            persistentTopics.getPartitionedMetadata(response2,
+                tenant, namespace, topic, true, false);
+            verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+            Assert.assertEquals(responseCaptor2.getValue().partitions, updatedPartition);
+            Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
+            Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
+        });
+    }
+
     @Test
     public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
         // Already have non partition topic special-topic-partition-10, shouldn't able to update number of