You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:17 UTC
[pulsar] 08/15: [fix][broker] Fix cannot delete namespace with system topic (#14730)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d32f99b6ef9213153af51efa0a485308ef4773ec
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Mar 22 19:26:18 2022 +0800
[fix][broker] Fix cannot delete namespace with system topic (#14730)
(cherry picked from commit 7556c4e0165660e8dbd141c4e93bb9e31a67e6f9)
---
.../pulsar/broker/web/PulsarWebResource.java | 7 +++-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 47 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8fe63753b17..bc60b4ee326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -733,7 +733,12 @@ public abstract class PulsarWebResource {
pulsarService.getConfigurationCache().policiesCache().getAsync(path).thenAccept(policiesResult -> {
if (policiesResult.isPresent()) {
Policies policies = policiesResult.get();
- if (policies.replication_clusters.isEmpty()) {
+ if (policies.deleted) {
+ String msg = String.format("Namespace %s is deleted", namespace.toString());
+ log.warn(msg);
+ validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+ "Namespace is deleted"));
+ } else if (policies.replication_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured : local_cluster=%s ns=%s",
localCluster, namespace.toString());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index e43cf4fbd11..581bd33f0a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -46,7 +46,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response.Status;
import lombok.Cleanup;
@@ -83,8 +82,9 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
-import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -98,6 +98,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -1269,6 +1270,48 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9);
}
+ @Test
+ public void testDeleteNamespaceWithTopicPolicies() throws Exception {
+ stopBroker();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ setup();
+
+ String tenant = "test-tenant";
+ assertFalse(admin.tenants().getTenants().contains(tenant));
+
+ // create tenant
+ admin.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
+ assertTrue(admin.tenants().getTenants().contains(tenant));
+
+ // create namespace2
+ String namespace = tenant + "/test-ns2";
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ // create topic
+ String topic = namespace + "/test-topic2";
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+ producer.send("test".getBytes(StandardCharsets.UTF_8));
+ BacklogQuota backlogQuota = BacklogQuotaImpl
+ .builder()
+ .limitTime(1000)
+ .limitSize(1000)
+ .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
+ .build();
+ admin.topics().setBacklogQuota(topic, backlogQuota);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(admin.topics()
+ .getBacklogQuotaMap(topic)
+ .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota);
+ });
+ producer.close();
+ admin.topics().delete(topic);
+ admin.namespaces().deleteNamespace(namespace);
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+ });
+ }
+
@Test(timeOut = 30000)
public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {
final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-" + UUID.randomUUID().toString();