You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/17 03:45:47 UTC
[pulsar] branch master updated: [improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 796afff2684 [improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)
796afff2684 is described below
commit 796afff2684b069228ad7d45b990af4838ee259e
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Oct 17 11:45:37 2022 +0800
[improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)
---
.../client/api/MessageDispatchThrottlingTest.java | 41 ++++++++++++++++++----
.../SubscriptionMessageDispatchThrottlingTest.java | 22 ++++++++----
2 files changed, 50 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index d024d43eb73..d3e5067ed31 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -53,8 +53,9 @@ import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -62,7 +63,7 @@ import org.testng.annotations.Test;
public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);
- @BeforeMethod
+ @BeforeClass
@Override
protected void setup() throws Exception {
this.conf.setClusterName("test");
@@ -70,12 +71,35 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
super.producerBaseSetup();
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @AfterMethod(alwaysRun = true)
+ protected void reset() throws Exception {
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+ for (String tenant : admin.tenants().getTenants()) {
+ for (String namespace : admin.namespaces().getNamespaces(tenant)) {
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+ admin.tenants().deleteTenant(tenant, true);
+ }
+
+ for (String cluster : admin.clusters().getClusters()) {
+ admin.clusters().deleteCluster(cluster);
+ }
+
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+ super.producerBaseSetup();
+ }
+
+
@DataProvider(name = "subscriptions")
public Object[][] subscriptionsProvider() {
return new Object[][] { new Object[] { SubscriptionType.Shared }, { SubscriptionType.Exclusive } };
@@ -280,6 +304,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered
int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
+ long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
// (1) Update message-dispatch-rate limit
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
Integer.toString(messageRate));
@@ -325,7 +350,9 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
consumer.close();
producer.close();
- pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+ Integer.toString(initValue));
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(initBytes));
log.info("-- Exiting {} test --", methodName);
}
@@ -675,7 +702,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
consumer.close();
producer.close();
- pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+ Integer.toString(initValue));
log.info("-- Exiting {} test --", methodName);
}
@@ -981,7 +1009,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
producer.close();
producer2.close();
-
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+ Integer.toString(initValue));
log.info("-- Exiting {} test --", methodName);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7881038cc7a..642bec7321e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -18,12 +18,11 @@
*/
package org.apache.pulsar.client.api;
+import static org.awaitility.Awaitility.await;
import com.google.common.collect.Sets;
-
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.awaitility.Awaitility.await;
-
@Test(groups = "flaky")
public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
@@ -243,6 +240,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+ long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
final int numProducedMessages = 30;
@@ -302,6 +300,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
consumer.close();
producer.close();
+
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
+
admin.topics().delete(topicName, true);
admin.namespaces().deleteNamespace(namespace);
}
@@ -417,6 +418,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+ long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
final int numProducedMessages = 30;
@@ -480,6 +482,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
consumer.close();
producer.close();
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
admin.topics().delete(topicName, true);
admin.namespaces().deleteNamespace(namespace);
}
@@ -532,6 +535,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
+ long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
final int byteRate = 1000;
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
@@ -591,6 +595,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
consumer2.close();
producer1.close();
producer2.close();
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
log.info("-- Exiting {} test --", methodName);
}
@@ -739,7 +744,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
consumer.close();
producer.close();
- pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
+ Integer.toString(initValue));
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
log.info("-- Exiting {} test --", methodName);
}
@@ -855,11 +862,12 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
producer.close();
producer2.close();
-
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
+ Integer.toString(initValue));
log.info("-- Exiting {} test --", methodName);
}
- @Test(dataProvider = "subscriptions", timeOut = 10000)
+ @Test(dataProvider = "subscriptions", timeOut = 11000)
public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
log.info("-- Starting {} test --", methodName);