You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2024/04/29 09:24:47 UTC

(pulsar) branch master updated: [fix] [test] Fix flaky test ReplicatorTest (#22594)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fdc0e31bff [fix] [test] Fix flaky test ReplicatorTest (#22594)
6fdc0e31bff is described below

commit 6fdc0e31bff906446e70965531671389d57e6cda
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Mon Apr 29 17:24:41 2024 +0800

    [fix] [test] Fix flaky test ReplicatorTest (#22594)
---
 .../broker/service/ReplicatorGlobalNSTest.java     | 129 ++++++++++++++++-----
 .../pulsar/broker/service/ReplicatorTest.java      | 121 +++++--------------
 2 files changed, 130 insertions(+), 120 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
index eed849ef1a0..514e0207fbf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java
@@ -18,18 +18,24 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -41,6 +47,11 @@ import org.testng.annotations.Test;
 import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
+ * a lot of topic deletion and makes namespace policies being incorrect.
+ */
+@Slf4j
 @Test(groups = "broker-impl")
 public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
 
@@ -81,7 +92,7 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
      *
      * @throws Exception
      */
-    @Test
+    @Test(priority = Integer.MAX_VALUE)
     public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
         log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
 
@@ -115,32 +126,88 @@ public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
         });
     }
 
-    @Test
-    public void testForcefullyTopicDeletion() throws Exception {
-        log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
-
-        final String namespace = "pulsar/removeClusterTest";
-        admin1.namespaces().createNamespace(namespace);
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
-
-        final String topicName = "persistent://" + namespace + "/topic";
-
-        @Cleanup
-        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
-                .build();
-
-        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
-                .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
-        producer1.close();
-
-        admin1.topics().delete(topicName, true);
-
-        MockedPulsarServiceBaseTest
-                .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
-
-        Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+    /**
+     * This is not a formal operation and can cause serious problems if call it in a production environment.
+     */
+    @Test(priority = Integer.MAX_VALUE - 1)
+    public void testConfigChange() throws Exception {
+        log.info("--- Starting ReplicatorTest::testConfigChange ---");
+        // This test is to verify that the config change on global namespace is successfully applied in broker during
+        // runtime.
+        // Run a set of producer tasks to create the topics
+        List<Future<Void>> results = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
+
+            results.add(executor.submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+
+                    @Cleanup
+                    MessageProducer producer = new MessageProducer(url1, dest);
+                    log.info("--- Starting producer --- " + url1);
+
+                    @Cleanup
+                    MessageConsumer consumer = new MessageConsumer(url1, dest);
+                    log.info("--- Starting Consumer --- " + url1);
+
+                    producer.produce(2);
+                    consumer.receive(2);
+                    return null;
+                }
+            }));
+        }
+
+        for (Future<Void> result : results) {
+            try {
+                result.get();
+            } catch (Exception e) {
+                log.error("exception in getting future result ", e);
+                fail(String.format("replication test failed with %s exception", e.getMessage()));
+            }
+        }
+
+        Thread.sleep(1000L);
+        // Make sure that the internal replicators map contains remote cluster info
+        ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
+        ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
+        ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
+
+        Assert.assertNotNull(replicationClients1.get("r2"));
+        Assert.assertNotNull(replicationClients1.get("r3"));
+        Assert.assertNotNull(replicationClients2.get("r1"));
+        Assert.assertNotNull(replicationClients2.get("r3"));
+        Assert.assertNotNull(replicationClients3.get("r1"));
+        Assert.assertNotNull(replicationClients3.get("r2"));
+
+        // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
+        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
+
+        // Wait for config changes to be updated.
+        Thread.sleep(1000L);
+
+        // Make sure that the internal replicators map still contains remote cluster info
+        Assert.assertNotNull(replicationClients1.get("r2"));
+        Assert.assertNotNull(replicationClients1.get("r3"));
+        Assert.assertNotNull(replicationClients2.get("r1"));
+        Assert.assertNotNull(replicationClients2.get("r3"));
+        Assert.assertNotNull(replicationClients3.get("r1"));
+        Assert.assertNotNull(replicationClients3.get("r2"));
+
+        // Case 2: Update the configuration back
+        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
+
+        // Wait for config changes to be updated.
+        Thread.sleep(1000L);
+
+        // Make sure that the internal replicators map still contains remote cluster info
+        Assert.assertNotNull(replicationClients1.get("r2"));
+        Assert.assertNotNull(replicationClients1.get("r3"));
+        Assert.assertNotNull(replicationClients2.get("r1"));
+        Assert.assertNotNull(replicationClients2.get("r3"));
+        Assert.assertNotNull(replicationClients3.get("r1"));
+        Assert.assertNotNull(replicationClients3.get("r2"));
+
+        // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
     }
-
-    private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index fa12eba1c66..765727aeac3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -44,13 +44,11 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -68,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -154,88 +153,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
-    @Test(priority = Integer.MAX_VALUE)
-    public void testConfigChange() throws Exception {
-        log.info("--- Starting ReplicatorTest::testConfigChange ---");
-        // This test is to verify that the config change on global namespace is successfully applied in broker during
-        // runtime.
-        // Run a set of producer tasks to create the topics
-        List<Future<Void>> results = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    @Cleanup
-                    MessageProducer producer = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    @Cleanup
-                    MessageConsumer consumer = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    producer.produce(2);
-                    consumer.receive(2);
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-
-        Thread.sleep(1000L);
-        // Make sure that the internal replicators map contains remote cluster info
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
-
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
-
-        // Wait for config changes to be updated.
-        Thread.sleep(1000L);
-
-        // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 2: Update the configuration back
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
-
-        // Wait for config changes to be updated.
-        Thread.sleep(1000L);
-
-        // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
-    }
-
     @Test(timeOut = 10000)
     public void activeBrokerParse() throws Exception {
         pulsar1.getConfiguration().setAuthorizationEnabled(true);
@@ -253,6 +170,32 @@ public class ReplicatorTest extends ReplicatorTestBase {
         pulsar1.getConfiguration().setAuthorizationEnabled(false);
     }
 
+    @Test
+    public void testForcefullyTopicDeletion() throws Exception {
+        log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
+
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
+
+        final String topicName = "persistent://" + namespace + "/topic";
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
+                .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
+        producer1.close();
+
+        admin1.topics().delete(topicName, true);
+
+        MockedPulsarServiceBaseTest
+                .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
+
+        Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
+    }
+
     @SuppressWarnings("unchecked")
     @Test(timeOut = 30000)
     public void testConcurrentReplicator() throws Exception {
@@ -1270,7 +1213,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
 
-        final String namespace = "pulsar/global/repl";
+        final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
         admin1.namespaces().createNamespace(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
@@ -1677,7 +1620,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         log.info("--- Starting ReplicatorTest::testReplication ---");
 
-        String namespace = "pulsar/global/ns2";
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
         admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
         final TopicName dest = TopicName
                 .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
@@ -1749,7 +1692,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
     @Test
     public void testWhenUpdateReplicationCluster() throws Exception {
         log.info("--- testWhenUpdateReplicationCluster ---");
-        String namespace = "pulsar/ns2";
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
         admin1.namespaces().createNamespace(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
         final TopicName dest = TopicName.get(
@@ -1778,12 +1721,12 @@ public class ReplicatorTest extends ReplicatorTestBase {
     @Test
     public void testReplicatorProducerNotExceed() throws Exception {
         log.info("--- testReplicatorProducerNotExceed ---");
-        String namespace1 = "pulsar/ns11";
+        String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
         admin1.namespaces().createNamespace(namespace1);
         admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
         final TopicName dest1 = TopicName.get(
                 BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
-        String namespace2 = "pulsar/ns22";
+        String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
         admin2.namespaces().createNamespace(namespace2);
         admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
         final TopicName dest2 = TopicName.get(