You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/05/09 06:33:02 UTC

[pulsar] branch branch-2.11 updated: [fix][test] Fix flaky testCreateTopicWithZombieReplicatorCursor (#20037)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 2239523abed [fix][test] Fix flaky testCreateTopicWithZombieReplicatorCursor (#20037)
2239523abed is described below

commit 2239523abedadc955f7e70b7a4c6bd59ceb03aff
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Apr 11 22:50:04 2023 +0800

    [fix][test] Fix flaky testCreateTopicWithZombieReplicatorCursor (#20037)
---
 .../service/persistent/PersistentTopicTest.java    | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 186078e9b1b..50a54aaa0b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -35,7 +35,9 @@ import static org.testng.Assert.assertNull;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -51,6 +53,7 @@ import com.google.common.collect.Sets;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -81,6 +84,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+@Slf4j
 @Test(groups = "broker")
 public class PersistentTopicTest extends BrokerTestBase {
 
@@ -443,10 +447,10 @@ public class PersistentTopicTest extends BrokerTestBase {
         admin.tenants().updateTenant("prop", tenantInfo);
 
         if (topicLevelPolicy) {
-            admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster));
+            admin.topics().setReplicationClusters(topicName, Arrays.asList("test", remoteCluster));
         } else {
             admin.namespaces().setNamespaceReplicationClustersAsync(
-                    namespace, Collections.singleton(remoteCluster)).get();
+                    namespace, Sets.newHashSet("test", remoteCluster)).get();
         }
 
         final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false)
@@ -461,16 +465,27 @@ public class PersistentTopicTest extends BrokerTestBase {
         };
         assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster));
 
+        // PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two
+        // updates of the replication clusters. So here we sleep for a while to reduce the flakiness.
+        Thread.sleep(100);
+
+        // Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication
         if (topicLevelPolicy) {
-            admin.topics().setReplicationClusters(topicName, Collections.emptyList());
+            admin.topics().setReplicationClusters(topicName, Collections.singletonList("test"));
         } else {
-            admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get();
+            admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get();
         }
         admin.clusters().deleteCluster(remoteCluster);
         // Now the cluster and its related policy has been removed but the replicator cursor still exists
 
-        topic.initialize().get(3, TimeUnit.SECONDS);
-        Awaitility.await().atMost(3, TimeUnit.SECONDS)
-                .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext());
+        Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> {
+            log.info("Before initialize...");
+            try {
+                topic.initialize().get(3, TimeUnit.SECONDS);
+            } catch (ExecutionException e) {
+                log.warn("Failed to initialize: {}", e.getCause().getMessage());
+            }
+            return !topic.getManagedLedger().getCursors().iterator().hasNext();
+        });
     }
 }