You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ho...@apache.org on 2022/08/28 08:13:10 UTC
[pulsar] branch master updated: [fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)
This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3422ab43839 [fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)
3422ab43839 is described below
commit 3422ab438390c252cbafaa3eb944a1b6be154a8e
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sun Aug 28 16:13:01 2022 +0800
[fix][broker]fail to update partition meta of topic due to ConflictException: subscription already exists for topic (#17251)
### Motivation
#10374 wants to handle the existing subscriptions when updating partition with `force` command parameter. However, it could not update the partition metadata if the `Subscription already exists for topic` exception comes. Because it miss updating when handling such exception.
---
.../broker/admin/impl/PersistentTopicsBase.java | 30 +++++++++++----
.../pulsar/broker/admin/PersistentTopicsTest.java | 43 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 6c55ac8c5d3..3db8f3a302f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -473,8 +473,8 @@ public class PersistentTopicsBase extends AdminResource {
})
.thenCompose(clusters -> tryCreatePartitionsAsync(numPartitions).thenApply(ignore ->
clusters))
- .thenCompose(clusters -> createSubscriptions(topicName, numPartitions).thenApply(ignore ->
- clusters))
+ .thenCompose(clusters -> createSubscriptions(topicName, numPartitions, force).thenApply(
+ ignore -> clusters))
.thenCompose(clusters -> {
if (!updateLocalTopicOnly) {
return updatePartitionInOtherCluster(numPartitions, clusters)
@@ -4368,10 +4368,9 @@ public class PersistentTopicsBase extends AdminResource {
}
}
-
private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) {
CompletableFuture<Void> result = new CompletableFuture<>();
- createSubscriptions(topicName, numPartitions).thenCompose(__ -> {
+ createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> {
CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(topicName, p ->
new PartitionedTopicMetadata(numPartitions, p.properties));
@@ -4409,8 +4408,11 @@ public class PersistentTopicsBase extends AdminResource {
*
* @param topicName : topic-name: persistent://prop/cluster/ns/topic
* @param numPartitions : number partitions for the topics
+ * @param ignoreConflictException : If true, ignore ConflictException: subscription already exists for topic
+ *
*/
- private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
+ private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions,
+ boolean ignoreConflictException) {
CompletableFuture<Void> result = new CompletableFuture<>();
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions < 1) {
@@ -4445,9 +4447,21 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = partitionMetadata.partitions; i < numPartitions; i++) {
final String topicNamePartition = topicName.getPartition(i).toString();
-
- subscriptionFutures.add(admin.topics().createSubscriptionAsync(topicNamePartition,
- subscription, MessageId.latest));
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ admin.topics().createSubscriptionAsync(topicNamePartition,
+ subscription, MessageId.latest).whenComplete((__, ex) -> {
+ if (ex == null) {
+ future.complete(null);
+ } else {
+ if (ignoreConflictException
+ && ex.getCause() instanceof PulsarAdminException.ConflictException) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(ex);
+ }
+ }
+ });
+ subscriptionFutures.add(future);
}
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index a49fb85479a..ae2ed5a59ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -105,6 +105,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
+ private final String testNamespaceLocal = "my-namespace-local";
protected Field uriField;
protected UriInfo uriInfo;
private NonPersistentTopics nonPersistentTopic;
@@ -156,6 +157,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
new TenantInfoImpl(Set.of("role1", "role2"), Set.of(testLocalCluster, "test")));
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of(testLocalCluster, "test"));
admin.namespaces().createNamespace("pulsar/system", 4);
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespaceLocal);
}
@Override
@@ -1558,4 +1560,45 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testUpdatePartitionedTopic()
+ throws KeeperException, InterruptedException, PulsarAdminException {
+ String topicName = "testUpdatePartitionedTopic";
+ String groupName = "cg_testUpdatePartitionedTopic";
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespaceLocal, topicName, 2, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ response = mock(AsyncResponse.class);
+ persistentTopics.createSubscription(response, testTenant, testNamespaceLocal, topicName, groupName, true,
+ new ResetCursorData(MessageId.latest), false);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+
+ response = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+ verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+ PartitionedTopicMetadata partitionedTopicMetadata = metaCaptor.getValue();
+ Assert.assertEquals(partitionedTopicMetadata.partitions, 2);
+
+ doNothing().when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
+ doReturn(CompletableFuture.completedFuture(null)).when(persistentTopics)
+ .validatePartitionedTopicMetadataAsync();
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true,
+ true, 4);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+
+ response = mock(AsyncResponse.class);
+ metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false);
+ verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+ partitionedTopicMetadata = metaCaptor.getValue();
+ Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+ }
}