You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/19 08:54:05 UTC
[pulsar] branch master updated: [fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new de6948ca7de [fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157)
de6948ca7de is described below
commit de6948ca7def90921ce7e9325a001bca2b21a1ad
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Fri Aug 19 16:53:56 2022 +0800
[fix][flaky-test]AdminApi2Test.testDeleteNamespace (#17157)
---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 49 ++++++++++++++++++++++
1 file changed, 49 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 18c4d890aae..11289119117 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -45,6 +47,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
@@ -102,6 +105,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1422,6 +1426,9 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
+ // Wait for change event topic and compaction create finish.
+ awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic));
+
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
@@ -1445,7 +1452,49 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
+ }
+ private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception {
+ if (!pulsar.getConfiguration().isSystemTopicEnabled()){
+ return;
+ }
+ // Trigger change event topic create.
+ SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
+ admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
+ // Wait for change event topic and compaction create finish.
+ String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
+ int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+ ArrayList<String> expectChangeEventTopics = new ArrayList<>();
+ if ("non-partitioned".equals(allowAutoTopicCreationType)){
+ String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
+ expectChangeEventTopics.add(t);
+ } else {
+ for (int i = 0; i < defaultNumPartitions; i++){
+ String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
+ expectChangeEventTopics.add(t);
+ }
+ }
+ Awaitility.await().until(() -> {
+ boolean finished = true;
+ for (String changeEventTopicName : expectChangeEventTopics){
+ CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false);
+ if (completableFuture == null){
+ finished = false;
+ }
+ Optional<Topic> optionalTopic = completableFuture.get();
+ if (!optionalTopic.isPresent()){
+ finished = false;
+ }
+ PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
+ if (!changeEventTopic.isCompactionEnabled()){
+ continue;
+ }
+ if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
+ finished = false;
+ }
+ }
+ return finished;
+ });
}
@Test