You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/24 08:11:39 UTC
[pulsar] branch master updated: [broker][fix]Fix update topic remove properties (#17231)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1de80e0684e [broker][fix]Fix update topic remove properties (#17231)
1de80e0684e is described below
commit 1de80e0684ec5c13b6edcd217af62d74d8677f04
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Aug 24 16:11:30 2022 +0800
[broker][fix]Fix update topic remove properties (#17231)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 ++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 42 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d9adc4ac20b..231168e35e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -481,7 +481,8 @@ public class PersistentTopicsBase extends AdminResource {
return updatePartitionInOtherCluster(numPartitions, clusters)
.thenCompose(v -> namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
- new PartitionedTopicMetadata(numPartitions)
+ new PartitionedTopicMetadata(numPartitions,
+ p.properties)
));
} else {
return CompletableFuture.completedFuture(null);
@@ -4361,7 +4362,8 @@ public class PersistentTopicsBase extends AdminResource {
CompletableFuture<Void> result = new CompletableFuture<>();
createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
- .updatePartitionedTopicAsync(topicName, p -> new PartitionedTopicMetadata(numPartitions));
+ .updatePartitionedTopicAsync(topicName, p ->
+ new PartitionedTopicMetadata(numPartitions, p.properties));
future.exceptionally(ex -> {
// If the update operation fails, clean up the partitions that were created
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index cabee91f36d..a49fb85479a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -616,6 +616,48 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
+ @Test
+ public void testUpdatePartitionedTopicHavingProperties() throws Exception {
+ final String tenant = "tenant-testUpdatePartitionedTopicHavingProperties";
+ final String namespace = "ns-testUpdatePartitionedTopicHavingProperties";
+ final String topic = "topic-testUpdatePartitionedTopicHavingProperties";
+ Map<String, String> topicMetadata = new HashMap<>();
+ topicMetadata.put("key1", "value1");
+
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+ admin.tenants().createTenant(tenant, tenantInfo);
+ admin.namespaces().createNamespace(tenant + "/" + namespace, Set.of("test"));
+
+ // create a 2 partition topic with properties key1->value1
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor =
+ ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
+ persistentTopics.createPartitionedTopic(response, tenant, namespace, topic, metadata, true);
+ Awaitility.await().untilAsserted(() -> {
+ persistentTopics.getPartitionedMetadata(response,
+ tenant, namespace, topic, true, false);
+ verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().properties.size(), 1);
+ Assert.assertEquals(responseCaptor.getValue().properties, topicMetadata);
+ });
+
+ // update partition to 5
+ final int updatedPartition = 5;
+ AsyncResponse response2 = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 =
+ ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.updatePartitionedTopic(response2, tenant, namespace, topic, false, false, false, updatedPartition);
+ Awaitility.await().untilAsserted(() -> {
+ persistentTopics.getPartitionedMetadata(response2,
+ tenant, namespace, topic, true, false);
+ verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+ Assert.assertEquals(responseCaptor2.getValue().partitions, updatedPartition);
+ Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
+ Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
+ });
+ }
+
@Test
public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
// Already have non partition topic special-topic-partition-10, shouldn't able to update number of