You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/21 13:35:50 UTC

[pulsar] 03/05: [fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 531888f0c17192a40c4bf4795c2c4fe16e30d775
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 16:53:56 2022 +0800

    [fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157)
---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 49 ++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 549eb9b6c0f..9db6821076a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -45,6 +47,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.NotAcceptableException;
 import javax.ws.rs.core.Response.Status;
@@ -102,6 +105,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1409,6 +1413,9 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(topic, 10);
         assertFalse(admin.topics().getList(namespace).isEmpty());
 
+        // Wait for change event topic and compaction create finish.
+        awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic));
+
         try {
             admin.namespaces().deleteNamespace(namespace, false);
             fail("should have failed due to namespace not empty");
@@ -1432,7 +1439,49 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
 
         final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
         assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
+    }
 
+    private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception {
+        if (!pulsar.getConfiguration().isSystemTopicEnabled()){
+            return;
+        }
+        // Trigger change event topic create.
+        SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
+        admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
+        // Wait for change event topic and compaction create finish.
+        String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
+        int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+        ArrayList<String> expectChangeEventTopics = new ArrayList<>();
+        if ("non-partitioned".equals(allowAutoTopicCreationType)){
+            String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
+            expectChangeEventTopics.add(t);
+        } else {
+            for (int i = 0; i < defaultNumPartitions; i++){
+                String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
+                expectChangeEventTopics.add(t);
+            }
+        }
+        Awaitility.await().until(() -> {
+            boolean finished = true;
+            for (String changeEventTopicName : expectChangeEventTopics){
+                CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false);
+                if (completableFuture == null){
+                    finished = false;
+                }
+                Optional<Topic> optionalTopic = completableFuture.get();
+                if (!optionalTopic.isPresent()){
+                    finished = false;
+                }
+                PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
+                if (!changeEventTopic.isCompactionEnabled()){
+                    continue;
+                }
+                if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
+                    finished = false;
+                }
+            }
+            return finished;
+        });
     }
 
     @Test