You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/03/02 18:06:27 UTC

[pulsar] branch branch-2.8 updated: [branch-2.8] [flaky tests] Fix flaky test ReplicatorTest.testRemoveClusterFromNamespace (#14535)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 8edfdb5  [branch-2.8] [flaky tests] Fix flaky test ReplicatorTest.testRemoveClusterFromNamespace (#14535)
8edfdb5 is described below

commit 8edfdb5388ebe8787fe476f638a1e5533ce2c75b
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Mar 2 19:03:24 2022 +0100

    [branch-2.8] [flaky tests] Fix flaky test ReplicatorTest.testRemoveClusterFromNamespace (#14535)
---
 .../service/ReplicatorRemoveClusterTest.java       | 110 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTest.java      |  50 ----------
 2 files changed, 110 insertions(+), 50 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
new file mode 100644
index 0000000..701ab47
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Starts 3 brokers that are in 3 different clusters
+ */
+@Test(groups = "broker")
+public class ReplicatorRemoveClusterTest extends ReplicatorTestBase {
+
+    protected String methodName;
+
+    @BeforeMethod(alwaysRun = true)
+    public void beforeMethod(Method m) throws Exception {
+        methodName = m.getName();
+        admin1.namespaces().removeBacklogQuota("pulsar/ns");
+        admin1.namespaces().removeBacklogQuota("pulsar/ns1");
+        admin1.namespaces().removeBacklogQuota("pulsar/global/ns");
+    }
+
+    @Override
+    @BeforeClass(alwaysRun = true, timeOut = 300000)
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true, timeOut = 300000)
+    public void cleanup() throws Exception {
+        super.cleanup();
+    }
+
+    @DataProvider(name = "partitionedTopic")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+
+    @Test
+    public void testRemoveClusterFromNamespace() throws Exception {
+        admin1.tenants().createTenant("pulsar1",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
+                        Sets.newHashSet("r1", "r2", "r3")));
+
+        admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r2", "r3"));
+
+        PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3");
+        Assert.assertNotNull(repClient1);
+        Assert.assertFalse(repClient1.isClosed());
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();
+
+        Producer<byte[]> producer = client.newProducer()
+                .topic(topicName)
+                .create();
+
+        producer.send("Pulsar".getBytes());
+
+        producer.close();
+        client.close();
+
+        Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
+                .get().getReplicators().get("r3");
+
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
+
+        admin1.clusters().deleteCluster("r3");
+
+        Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected()));
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed()));
+
+        Awaitility.await().untilAsserted(() -> Assert.assertNull(
+                pulsar1.getBrokerService().getReplicationClients().get("r3")));
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 29d86d6..e71b0aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1179,56 +1179,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
     }
 
     @Test
-    public void testRemoveClusterFromNamespace() throws Exception {
-        final String cluster4 = "r4";
-        admin1.clusters().createCluster(cluster4, ClusterData.builder()
-                .serviceUrl(url3.toString())
-                .serviceUrlTls(urlTls3.toString())
-                .brokerServiceUrl(pulsar3.getSafeBrokerServiceUrl())
-                .brokerServiceUrlTls(pulsar3.getBrokerServiceUrlTls())
-                .build());
-
-        admin1.tenants().createTenant("pulsar1",
-                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
-                        Sets.newHashSet("r1", "r3", cluster4)));
-
-        admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r3", cluster4));
-
-        PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient(cluster4);
-        Assert.assertNotNull(repClient1);
-        Assert.assertFalse(repClient1.isClosed());
-
-        PulsarClient client = PulsarClient.builder()
-                .serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
-                .build();
-
-        final String topicName = "persistent://pulsar1/ns1/testRemoveClusterFromNamespace-" + UUID.randomUUID();
-
-        Producer<byte[]> producer = client.newProducer()
-                .topic(topicName)
-                .create();
-
-        producer.send("Pulsar".getBytes());
-
-        producer.close();
-        client.close();
-
-        Replicator replicator = pulsar1.getBrokerService().getTopicReference(topicName)
-                .get().getReplicators().get(cluster4);
-
-        Awaitility.waitAtMost(30, TimeUnit.SECONDS)
-            .untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
-
-        admin1.clusters().deleteCluster(cluster4);
-
-        Awaitility.await().untilAsserted(() -> Assert.assertFalse(replicator.isConnected()));
-        Awaitility.await().untilAsserted(() -> Assert.assertTrue(repClient1.isClosed()));
-
-        Awaitility.await().untilAsserted(() -> Assert.assertNull(
-                pulsar1.getBrokerService().getReplicationClients().get(cluster4)));
-    }
-
-    @Test
     public void testDoNotReplicateSystemTopic() throws Exception {
         final String namespace = newUniqueName("pulsar/ns");
         admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", "r2", "r3"));