You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/13 05:13:11 UTC
[pulsar] branch master updated: [cleanup][broker] Cleanup Unused code in DispatchRateLimter (#16499)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ad6213828c1 [cleanup][broker] Cleanup Unused code in DispatchRateLimter (#16499)
ad6213828c1 is described below
commit ad6213828c1f2e6b736657d3745522ac1e2b1bb1
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jul 13 13:13:06 2022 +0800
[cleanup][broker] Cleanup Unused code in DispatchRateLimter (#16499)
---
.../service/persistent/DispatchRateLimiter.java | 29 -------------------
.../client/api/MessageDispatchThrottlingTest.java | 33 ----------------------
2 files changed, 62 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 5f788c9ac14..d71dc4edade 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -28,7 +28,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,34 +200,6 @@ public class DispatchRateLimiter {
updateDispatchRate(dispatchRate);
}
- @SuppressWarnings("deprecation")
- public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
- Optional<Policies> policies,
- Type type) {
- // return policy-dispatch rate only if it's enabled in policies
- return policies.map(p -> {
- DispatchRateImpl dispatchRate;
- switch (type) {
- case TOPIC:
- dispatchRate = p.topicDispatchRate.get(cluster);
- if (dispatchRate == null) {
- dispatchRate = p.clusterDispatchRate.get(cluster);
- }
- break;
- case SUBSCRIPTION:
- dispatchRate = p.subscriptionDispatchRate.get(cluster);
- break;
- case REPLICATOR:
- dispatchRate = p.replicatorDispatchRate.get(cluster);
- break;
- default:
- log.error("error DispatchRateLimiter type: {} ", type);
- return null;
- }
- return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null;
- }).orElse(null);
- }
-
public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService,
String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 561f4a4a159..dfa5a2c70a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -1029,39 +1029,6 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
- @SuppressWarnings("deprecation")
- @Test
- public void testDispatchRateCompatibility1() throws Exception {
- final String cluster = "test";
-
- Optional<Policies> policies = Optional.of(new Policies());
- DispatchRateImpl clusterDispatchRate = DispatchRateImpl.builder()
- .dispatchThrottlingRateInMsg(10)
- .dispatchThrottlingRateInByte(512)
- .ratePeriodInSecond(1)
- .build();
- DispatchRateImpl topicDispatchRate = DispatchRateImpl.builder()
- .dispatchThrottlingRateInMsg(200)
- .dispatchThrottlingRateInByte(1024)
- .ratePeriodInSecond(1)
- .build();
-
- // (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled
- DispatchRateImpl dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies,
- DispatchRateLimiter.Type.TOPIC);
- Assert.assertNull(dispatchRate);
-
- // (2) If topicDispatchRate is empty, clusterDispatchRate is effective
- policies.get().clusterDispatchRate.put(cluster, clusterDispatchRate);
- dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC);
- Assert.assertEquals(dispatchRate, clusterDispatchRate);
-
- // (3) If topicDispatchRate is not empty, topicDispatchRate is effective
- policies.get().topicDispatchRate.put(cluster, topicDispatchRate);
- dispatchRate = DispatchRateLimiter.getPoliciesDispatchRate(cluster, policies, DispatchRateLimiter.Type.TOPIC);
- Assert.assertEquals(dispatchRate, topicDispatchRate);
- }
-
@SuppressWarnings("deprecation")
@Test
public void testDispatchRateCompatibility2() throws Exception {