You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/03/20 15:37:58 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 5eb0ce2ac5c [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
5eb0ce2ac5c is described below
commit 5eb0ce2ac5c40ee13affebe9fc738e6b91ccc96e
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Mar 20 23:37:23 2023 +0800
[fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
Co-authored-by: Jiwe Guo <te...@apache.org>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 47 +++++++++++++++++-----
.../broker/service/persistent/PersistentTopic.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 36 ++++++++++++++++-
3 files changed, 74 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ce36d49ff..f76e8a02827 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -310,10 +311,21 @@ public abstract class NamespacesBase extends AdminResource {
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// remove system topics first.
+ Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+ Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
if (!topics.isEmpty()) {
for (String topic : topics) {
try {
- futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPartitioned()) {
+ partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+ } else {
+ noPartitionedTopicPolicySystemTopic.add(topic);
+ }
+ } else {
+ futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ }
} catch (Exception ex) {
log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -321,11 +333,14 @@ public abstract class NamespacesBase extends AdminResource {
}
}
}
- FutureUtil.waitForAll(futures).thenCompose(__ -> {
- List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
- NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle bundle : bundles.getBundles()) {
+ FutureUtil.waitForAll(futures)
+ .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+ .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+ .thenCompose(__ -> {
+ List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
+ NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(namespaceName);
+ for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
if (ownership.isPresent()) {
@@ -475,13 +490,19 @@ public abstract class NamespacesBase extends AdminResource {
Set<String> nonPartitionedTopics = new HashSet<>();
Set<String> allSystemTopics = new HashSet<>();
Set<String> allPartitionedSystemTopics = new HashSet<>();
+ Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+ Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
- allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+ } else {
+ allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ }
continue;
}
String partitionedTopic = topicName.getPartitionedTopicName();
@@ -490,12 +511,17 @@ public abstract class NamespacesBase extends AdminResource {
}
} else {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
- allSystemTopics.add(topic);
+ if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+ noPartitionedTopicPolicySystemTopic.add(topic);
+ } else {
+ allSystemTopics.add(topic);
+ }
continue;
}
nonPartitionedTopics.add(topic);
}
- topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
+ topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+ topic, true, true));
} catch (Exception e) {
String errorMessage = String.format("Failed to force delete topic %s, "
+ "but the previous deletion command of partitioned-topics:%s "
@@ -524,6 +550,9 @@ public abstract class NamespacesBase extends AdminResource {
.thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
.thenCompose((ignore) ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore ->
+ internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+ .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8e95f2c4313..d8f56d72960 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1186,7 +1186,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
- && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
+ && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 16dfe5bc9a3..eaa54c1fd34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -57,11 +58,13 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -103,6 +107,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -1938,7 +1943,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
}
@Test
- public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+ public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1958,4 +1963,33 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
// 4. delete the policies topic and the topic wil not to clear topic polices
admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
+ @Test
+ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
+ field.setAccessible(true);
+ field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+ String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
+ admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw")));
+
+ admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(systemTopic).create();
+ admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+ Integer maxConsumerPerTopic = pulsar
+ .getTopicPoliciesService()
+ .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+ .getMaxConsumerPerTopic();
+
+ assertEquals(maxConsumerPerTopic, Integer.valueOf(5));
+ admin.topics().delete(systemTopic, true);
+ TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+ .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
+ assertNull(topicPolicies);
+ }
}