You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 05:59:59 UTC
[pulsar] branch master updated: Use Awaitility instead Thread.sleep
in InactiveTopicDeleteTest.java (#8647)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b790ba Use Awaitility instead Thread.sleep in InactiveTopicDeleteTest.java (#8647)
1b790ba is described below
commit 1b790ba4a64af36749c96a27f9b2e3baf5e3003a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat Nov 21 13:59:28 2020 +0800
Use Awaitility instead Thread.sleep in InactiveTopicDeleteTest.java (#8647)
---
.../broker/service/InactiveTopicDeleteTest.java | 168 +++++++++------------
1 file changed, 74 insertions(+), 94 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
index b99b8a4..5638538 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/InactiveTopicDeleteTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
@@ -36,6 +37,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -82,14 +84,12 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
consumer.close();
producer.close();
- Thread.sleep(2000);
- Assert.assertTrue(admin.topics().getList("prop/ns-abc")
- .contains(topic));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertTrue(admin.topics().getList("prop/ns-abc")
+ .contains(topic)));
admin.topics().deleteSubscription(topic, "sub");
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getList("prop/ns-abc")
- .contains(topic));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc")
+ .contains(topic)));
}
@Test
@@ -104,14 +104,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
pulsarClient.newProducer().topic(topic).create().close();
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
- Thread.sleep(2000);
- Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
- .contains(topic));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc")
+ .contains(topic)));
admin.topics().deleteSubscription(topic, "sub");
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
- .contains(topic));
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc")
+ .contains(topic)));
}
@Test
@@ -162,13 +162,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
pulsarClient.newConsumer().topic(topic2).subscriptionName("sub2").subscribe().close();
- Thread.sleep(2000);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertTrue(admin.topics().getList(namespace).contains(topic2)));
Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
- Assert.assertTrue(admin.topics().getList(namespace).contains(topic2));
admin.topics().deleteSubscription(topic, "sub");
admin.topics().deleteSubscription(topic2, "sub2");
- Thread.sleep(2000);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
Assert.assertTrue(admin.topics().getPartitionedTopicList(namespace).contains(topic));
// BrokerDeleteInactivePartitionedTopicMetaDataEnabled is not enabled, so only NonPartitionedTopic will be cleaned
Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
@@ -213,13 +214,12 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
InactiveTopicPolicies policies;
//wait for zk
- while (true) {
- policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
- if (policies.isDeleteWhileInactive()) {
- break;
- }
- Thread.sleep(1000);
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> {
+ InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ return temp.isDeleteWhileInactive();
+ });
+ policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
assertEquals(policies.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
@@ -227,13 +227,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace));
admin.namespaces().removeInactiveTopicPolicies(namespace);
- while (true) {
- Thread.sleep(500);
- policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
- if (policies.getMaxInactiveDurationSeconds() == 1000) {
- break;
- }
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> {
+ InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ return temp.getMaxInactiveDurationSeconds() == 1000;
+ });
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
, defaultPolicy);
@@ -244,13 +242,11 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
assertEquals(policies, admin.namespaces().getInactiveTopicPolicies(namespace2));
admin.namespaces().removeInactiveTopicPolicies(namespace2);
- while (true) {
- Thread.sleep(500);
- policies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies;
- if (policies.getMaxInactiveDurationSeconds() == 1000) {
- break;
- }
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> {
+ InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ return temp.getMaxInactiveDurationSeconds() == 1000;
+ });
assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic2, false).get().get()).inactiveTopicPolicies
, defaultPolicy);
}
@@ -297,14 +293,10 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
admin.namespaces().setInactiveTopicPolicies(namespace2, inactiveTopicPolicies);
//wait for zk
- while (true) {
- InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
- .getTopic(topic, false).get().get()).inactiveTopicPolicies;
- if (policies.isDeleteWhileInactive()) {
- break;
- }
- Thread.sleep(100);
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ InactiveTopicPolicies temp = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies;
+ return temp.isDeleteWhileInactive();
+ });
// topic should still exist
Thread.sleep(2000);
@@ -314,14 +306,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
// no backlog, trigger delete_when_subscriptions_caught_up
admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> Assert.assertFalse(admin.topics().getList(namespace2).contains(topic2)));
// delete subscription, trigger delete_when_no_subscriptions
for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
}
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic)));
Assert.assertFalse(admin.topics().getList(namespace3).contains(topic3));
}
@@ -354,9 +346,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
.contains(topic));
admin.topics().skipAllMessages(topic, "sub");
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getList("prop/ns-abc")
- .contains(topic));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertFalse(admin.topics().getList("prop/ns-abc").contains(topic)));
}
@Test
@@ -393,9 +384,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
admin.topics().createPartitionedTopic(topicName, 3);
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe().close();
TopicName topic = TopicName.get(topicName);
- while (!pulsar.getTopicPoliciesService().cacheIsInitialized(topic)) {
- Thread.sleep(500);
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> pulsar.getTopicPoliciesService().cacheIsInitialized(topic));
InactiveTopicPolicies inactiveTopicPolicies = admin.topics().getInactiveTopicPolicies(topicName);
assertNull(inactiveTopicPolicies);
@@ -406,21 +396,13 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
policies.setMaxInactiveDurationSeconds(10);
admin.topics().setInactiveTopicPolicies(topicName, policies);
- for (int i = 0; i < 50; i++) {
- if (admin.topics().getInactiveTopicPolicies(topicName) != null) {
- break;
- }
- Thread.sleep(100);
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> admin.topics().getInactiveTopicPolicies(topicName) != null);
assertEquals(admin.topics().getInactiveTopicPolicies(topicName), policies);
admin.topics().removeInactiveTopicPolicies(topicName);
- for (int i = 0; i < 50; i++) {
- if (admin.topics().getInactiveTopicPolicies(topicName) == null) {
- break;
- }
- Thread.sleep(100);
- }
- assertNull(admin.topics().getInactiveTopicPolicies(topicName));
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> assertNull(admin.topics().getInactiveTopicPolicies(topicName)));
}
@Test(timeOut = 30000)
@@ -461,12 +443,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
admin.topics().setInactiveTopicPolicies(topic3, inactiveTopicPolicies);
//wait for cache
- for (int i = 0; i < 50; i++) {
- if (admin.topics().getInactiveTopicPolicies(topic) != null) {
- break;
- }
- Thread.sleep(100);
- }
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> admin.topics().getInactiveTopicPolicies(topic) != null);
InactiveTopicPolicies policies = ((PersistentTopic) pulsar.getBrokerService()
.getTopic(topic, false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
@@ -477,9 +455,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
admin.topics().removeInactiveTopicPolicies(topic);
//Only the broker-level policies is set, so after removing the topic-level policies
// , the topic will use the broker-level policies
- Thread.sleep(1000);
- assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
- , defaultPolicy);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> assertEquals(((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get()).inactiveTopicPolicies
+ , defaultPolicy));
policies = ((PersistentTopic)pulsar.getBrokerService().getTopic(topic2,false).get().get()).inactiveTopicPolicies;
Assert.assertTrue(policies.isDeleteWhileInactive());
@@ -491,14 +469,20 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
// , the topic will use the namespace level policies
admin.namespaces().setInactiveTopicPolicies(namespace, inactiveTopicPolicies);
//wait for zk
- Thread.sleep(1000);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ InactiveTopicPolicies tempPolicies = ((PersistentTopic) pulsar.getBrokerService().getTopic(topic, false)
+ .get().get()).inactiveTopicPolicies;
+ return inactiveTopicPolicies.equals(tempPolicies);
+ });
admin.topics().removeInactiveTopicPolicies(topic2);
// The cache has been updated, but the system-event may not be consumed yet
// ,so wait for topic-policies update event
- Thread.sleep(1000);
- InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
- .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
- assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+ Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> {
+ InactiveTopicPolicies nsPolicies = ((PersistentTopic) pulsar.getBrokerService()
+ .getTopic(topic2, false).get().get()).inactiveTopicPolicies;
+ assertEquals(nsPolicies.getMaxInactiveDurationSeconds(), 999);
+ });
+
}
@Test(timeOut = 30000)
@@ -509,8 +493,6 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
conf.setBrokerDeleteInactiveTopicsEnabled(true);
conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
- //wait for cache init
- Thread.sleep(2000);
final String topic = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
final String topic2 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
final String topic3 = "persistent://prop/ns-abc/test-" + UUID.randomUUID();
@@ -529,6 +511,9 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
producer.close();
Thread.sleep(1);
}
+ //wait for cache init
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic3)));
// "topic" use delete_when_no_subscriptions, "topic2" use delete_when_subscriptions_caught_up
// "topic3" use default:delete_when_no_subscriptions
InactiveTopicPolicies inactiveTopicPolicies =
@@ -538,13 +523,8 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
admin.topics().setInactiveTopicPolicies(topic2, inactiveTopicPolicies);
//wait for update
- for (int i = 0; i < 50; i++) {
- if (admin.topics().getInactiveTopicPolicies(topic2) != null) {
- break;
- }
- Thread.sleep(100);
- }
-
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(()
+ -> admin.topics().getInactiveTopicPolicies(topic2) != null);
// topic should still exist
Thread.sleep(2000);
Assert.assertTrue(admin.topics().getList(namespace).contains(topic));
@@ -553,14 +533,14 @@ public class InactiveTopicDeleteTest extends BrokerTestBase {
// no backlog, trigger delete_when_subscriptions_caught_up
admin.topics().skipAllMessages(topic2, topicToSub.remove(topic2));
- Thread.sleep(2000);
- Assert.assertFalse(admin.topics().getList(namespace).contains(topic2));
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic2)));
// delete subscription, trigger delete_when_no_subscriptions
for (Map.Entry<String, String> entry : topicToSub.entrySet()) {
admin.topics().deleteSubscription(entry.getKey(), entry.getValue());
}
- Thread.sleep(2000);
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
+ -> Assert.assertFalse(admin.topics().getList(namespace).contains(topic3)));
Assert.assertFalse(admin.topics().getList(namespace).contains(topic));
- Assert.assertFalse(admin.topics().getList(namespace).contains(topic3));
}
}