You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/17 03:45:47 UTC

[pulsar] branch master updated: [improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 796afff2684 [improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)
796afff2684 is described below

commit 796afff2684b069228ad7d45b990af4838ee259e
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Oct 17 11:45:37 2022 +0800

    [improve][test] Improve SubscriptionMessageDispatchThrottlingTest to reduce the execution time (#18039)
---
 .../client/api/MessageDispatchThrottlingTest.java  | 41 ++++++++++++++++++----
 .../SubscriptionMessageDispatchThrottlingTest.java | 22 ++++++++----
 2 files changed, 50 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index d024d43eb73..d3e5067ed31 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -53,8 +53,9 @@ import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -62,7 +63,7 @@ import org.testng.annotations.Test;
 public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);
 
-    @BeforeMethod
+    @BeforeClass
     @Override
     protected void setup() throws Exception {
         this.conf.setClusterName("test");
@@ -70,12 +71,35 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         super.producerBaseSetup();
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @AfterMethod(alwaysRun = true)
+    protected void reset() throws Exception {
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+        for (String tenant : admin.tenants().getTenants()) {
+            for (String namespace : admin.namespaces().getNamespaces(tenant)) {
+                admin.namespaces().deleteNamespace(namespace, true);
+            }
+            admin.tenants().deleteTenant(tenant, true);
+        }
+
+        for (String cluster : admin.clusters().getClusters()) {
+            admin.clusters().deleteCluster(cluster);
+        }
+
+        pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        super.producerBaseSetup();
+    }
+
+
     @DataProvider(name = "subscriptions")
     public Object[][] subscriptionsProvider() {
         return new Object[][] { new Object[] { SubscriptionType.Shared }, { SubscriptionType.Exclusive } };
@@ -280,6 +304,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered
 
         int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
+        long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         // (1) Update message-dispatch-rate limit
         admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
                 Integer.toString(messageRate));
@@ -325,7 +350,9 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
 
         consumer.close();
         producer.close();
-        pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+                Integer.toString(initValue));
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(initBytes));
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -675,7 +702,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
 
         consumer.close();
         producer.close();
-        pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+                Integer.toString(initValue));
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -981,7 +1009,8 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
 
         producer.close();
         producer2.close();
-
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
+                Integer.toString(initValue));
         log.info("-- Exiting {} test --", methodName);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 7881038cc7a..642bec7321e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -18,12 +18,11 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.awaitility.Awaitility.await;
 import com.google.common.collect.Sets;
-
 import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -38,8 +37,6 @@ import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.awaitility.Awaitility.await;
-
 @Test(groups = "flaky")
 public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
     private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
@@ -243,6 +240,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
         admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
 
         final int numProducedMessages = 30;
@@ -302,6 +300,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         consumer.close();
         producer.close();
+
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
+
         admin.topics().delete(topicName, true);
         admin.namespaces().deleteNamespace(namespace);
     }
@@ -417,6 +418,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
         admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
 
         final int numProducedMessages = 30;
@@ -480,6 +482,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         consumer.close();
         producer.close();
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
         admin.topics().delete(topicName, true);
         admin.namespaces().deleteNamespace(namespace);
     }
@@ -532,6 +535,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
         final String subName = "my-subscriber-name-" + subscription;
 
+        long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
         final int byteRate = 1000;
         admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
         admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
@@ -591,6 +595,7 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         consumer2.close();
         producer1.close();
         producer2.close();
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -739,7 +744,9 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         consumer.close();
         producer.close();
-        pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
+                Integer.toString(initValue));
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
         log.info("-- Exiting {} test --", methodName);
     }
 
@@ -855,11 +862,12 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
 
         producer.close();
         producer2.close();
-
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
+                Integer.toString(initValue));
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test(dataProvider = "subscriptions", timeOut = 10000)
+    @Test(dataProvider = "subscriptions", timeOut = 11000)
     public void testClosingRateLimiter(SubscriptionType subscription) throws Exception {
         log.info("-- Starting {} test --", methodName);