You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 05:59:59 UTC

[pulsar] branch master updated: Use Awaitility instead Thread.sleep in InactiveTopicDeleteTest.java (#8647)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b790ba  Use  Awaitility  instead Thread.sleep in InactiveTopicDeleteTest.java (#8647)
1b790ba is described below

commit 1b790ba4a64af36749c96a27f9b2e3baf5e3003a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat Nov 21 13:59:28 2020 +0800

    Use  Awaitility  instead Thread.sleep in InactiveTopicDeleteTest.java (#8647)
---
 .../broker/service/InactiveTopicDeleteTest.java    | 168 +++++++++------------
 1 file changed, 74 insertions(+), 94 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index b99b8a4..5638538 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
@@ -36,6 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -82,14 +84,12 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         consumer.close();
         producer.close();
 
-        Thread.sleep(2000);
-        Assert.assertTrue(admin.topics().getList("prop/ns-abc")
-            .contains(topic));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
+                .contains(topic)));
 
         admin.topics().deleteSubscription(topic, "sub");
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getList("prop/ns-abc")
-            .contains(topic));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
+                .contains(topic)));
     }
 
     @Test
@@ -104,14 +104,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         pulsarClient.newProducer().topic(topic).create().close();
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
 
-        Thread.sleep(2000);
-        Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
-            .contains(topic));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
+                .contains(topic)));
 
         admin.topics().deleteSubscription(topic, "sub");
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
-            .contains(topic));
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
+                .contains(topic)));
     }
 
     @Test
@@ -162,13 +162,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
         pulsarClient.newConsumer().topic(topic2).subscriptionName("sub2").subscribe().close();
 
-        Thread.sleep(2000);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertTrue(admin.topics().getList(namespace).contains(topic2)));
         Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
-        Assert.assertTrue(admin.topics().getList(namespace).contains(topic2));
 
         admin.topics().deleteSubscription(topic, "sub");
         admin.topics().deleteSubscription(topic2, "sub2");
-        Thread.sleep(2000);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
         Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
         // BrokerDeleteInactivePartitionedTopicMetaDataEnabled is not enabled, so only NonPartitionedTopic will be cleaned
         Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
@@ -213,13 +214,12 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
 
         InactiveTopicPolicies policies;
         //wait for zk
-        while (true) {
-            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
-            if (policies.isDeleteWhileInactive()) {
-                break;
-            }
-            Thread.sleep(1000);
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> {
+            InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            return temp.isDeleteWhileInactive();
+        });
+        policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
 
         Assert.assertTrue(policies.isDeleteWhileInactive());
         assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
@@ -227,13 +227,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));
 
         admin.namespaces().removeInactiveTopicPolicies(namespace);
-        while (true) {
-            Thread.sleep(500);
-            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
-            if (policies.getMaxInactiveDurationSeconds() == 1000) {
-                break;
-            }
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> {
+            InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            return temp.getMaxInactiveDurationSeconds() == 1000;
+        });
         assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
                 , defaultPolicy);
 
@@ -244,13 +242,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));
 
         admin.namespaces().removeInactiveTopicPolicies(namespace2);
-        while (true) {
-            Thread.sleep(500);
-            policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
-            if (policies.getMaxInactiveDurationSeconds() == 1000) {
-                break;
-            }
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> {
+            InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            return temp.getMaxInactiveDurationSeconds() == 1000;
+        });
         assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
                 , defaultPolicy);
     }
@@ -297,14 +293,10 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
 
         //wait for zk
-        while (true) {
-            InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
-                    .getTopic(topic, false).get().get()).inactiveTopicPolicies;
-            if (policies.isDeleteWhileInactive()) {
-                break;
-            }
-            Thread.sleep(100);
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+            InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+            return temp.isDeleteWhileInactive();
+        });
 
         // topic should still exist
         Thread.sleep(2000);
@@ -314,14 +306,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
 
         // no backlog, trigger delete_when_subscriptions_caught_up
         admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2)));
         // delete subscription, trigger delete_when_no_subscriptions
         for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
             admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
         }
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic)));
         Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
     }
 
@@ -354,9 +346,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
             .contains(topic));
 
         admin.topics().skipAllMessages(topic, "sub");
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getList("prop/ns-abc")
-            .contains(topic));
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
     }
 
     @Test
@@ -393,9 +384,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         admin.topics().createPartitionedTopic(topicName, 3);
         pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
         TopicName topic = TopicName.get(topicName);
-        while (!pulsar.getTopicPoliciesService().cacheIsInitialized(topic)) {
-            Thread.sleep(500);
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
 
         InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName);
         assertNull(inactiveTopicPolicies);
@@ -406,21 +396,13 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         policies.setMaxInactiveDurationSeconds(10);
         admin.topics().setInactiveTopicPolicies(topicName, policies);
 
-        for (int i = 0; i < 50; i++) {
-            if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
-                break;
-            }
-            Thread.sleep(100);
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> admin.topics().getInactiveTopicPolicies(topicName) != null);
         assertEquals(admin.topics().getInactiveTopicPolicies(topicName), policies);
         admin.topics().removeInactiveTopicPolicies(topicName);
-        for (int i = 0; i < 50; i++) {
-            if (admin.topics().getInactiveTopicPolicies(topicName) == null) {
-                break;
-            }
-            Thread.sleep(100);
-        }
-        assertNull(admin.topics().getInactiveTopicPolicies(topicName));
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> assertNull(admin.topics().getInactiveTopicPolicies(topicName)));
     }
 
     @Test(timeOut = 30000)
@@ -461,12 +443,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);
 
         //wait for cache
-        for (int i = 0; i < 50; i++) {
-            if (admin.topics().getInactiveTopicPolicies(topic) != null) {
-                break;
-            }
-            Thread.sleep(100);
-        }
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> admin.topics().getInactiveTopicPolicies(topic) != null);
         InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
                 .getTopic(topic, false).get().get()).inactiveTopicPolicies;
         Assert.assertTrue(policies.isDeleteWhileInactive());
@@ -477,9 +455,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         admin.topics().removeInactiveTopicPolicies(topic);
         //Only the broker-level policies is set, so after removing the topic-level policies
         // , the topic will use the broker-level policies
-        Thread.sleep(1000);
-        assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
-                , defaultPolicy);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
+                , defaultPolicy));
 
         policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
         Assert.assertTrue(policies.isDeleteWhileInactive());
@@ -491,14 +469,20 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         // , the topic will use the namespace level policies
         admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
         //wait for zk
-        Thread.sleep(1000);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            InactiveTopicPolicies tempPolicies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false)
+                    .get().get()).inactiveTopicPolicies;
+            return inactiveTopicPolicies.equals(tempPolicies);
+        });
         admin.topics().removeInactiveTopicPolicies(topic2);
         // The cache has been updated, but the system-event may not be consumed yet
         // ,so wait for topic-policies update event
-        Thread.sleep(1000);
-        InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
-                .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
-        assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+            InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
+                    .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+            assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+        });
+
     }
 
     @Test(timeOut = 30000)
@@ -509,8 +493,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         conf.setBrokerDeleteInactiveTopicsEnabled(true);
         conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
-        //wait for cache init
-        Thread.sleep(2000);
         final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
         final String topic2 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
         final String topic3 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
@@ -529,6 +511,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
             producer.close();
             Thread.sleep(1);
         }
+        //wait for cache init
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic3)));
         // "topic" use delete_when_no_subscriptions, "topic2" use delete_when_subscriptions_caught_up
         // "topic3" use default:delete_when_no_subscriptions
         InactiveTopicPolicies inactiveTopicPolicies =
@@ -538,13 +523,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
         admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
 
         //wait for update
-        for (int i = 0; i < 50; i++) {
-            if (admin.topics().getInactiveTopicPolicies(topic2) != null) {
-                break;
-            }
-            Thread.sleep(100);
-        }
-
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+                -> admin.topics().getInactiveTopicPolicies(topic2) != null);
         // topic should still exist
         Thread.sleep(2000);
         Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
@@ -553,14 +533,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
 
         // no backlog, trigger delete_when_subscriptions_caught_up
         admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
-        Thread.sleep(2000);
-        Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
         // delete subscription, trigger delete_when_no_subscriptions
         for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
             admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
         }
-        Thread.sleep(2000);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+                -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)));
         Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
-        Assert.assertFalse(admin.topics().getList(namespace).contains(topic3));
     }
 }