You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/14 02:29:38 UTC

[pulsar] branch master updated: Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dbeb2d08d Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
39dbeb2d08d is described below

commit 39dbeb2d08d0f8e6495ef4f97e1c28fc72b6de87
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Tue Jun 14 10:29:31 2022 +0800

    Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
---
 .../service/persistent/DispatchRateLimiter.java    | 75 ----------------------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 19 +-----
 ...ntStickyKeyDispatcherMultipleConsumersTest.java | 73 ++++++++-------------
 3 files changed, 29 insertions(+), 138 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 481a97ee15b..5f788c9ac14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -24,12 +24,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.slf4j.Logger;
@@ -203,79 +201,6 @@ public class DispatchRateLimiter {
         updateDispatchRate(dispatchRate);
     }
 
-    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
-            String topicName, Type type) {
-        final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
-        if (type == Type.BROKER) {
-            return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
-        }
-
-        Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
-        if (dispatchRate.isPresent()) {
-            return true;
-        }
-
-        policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
-        return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
-    }
-
-    public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
-                                                                    String topicName, Type type) {
-        Optional<DispatchRate> dispatchRate = Optional.empty();
-        final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
-        if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
-            try {
-                switch (type) {
-                    case TOPIC:
-                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                .map(TopicPolicies::getDispatchRate);
-                        break;
-                    case SUBSCRIPTION:
-                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                .map(TopicPolicies::getSubscriptionDispatchRate);
-                        break;
-                    case REPLICATOR:
-                        dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
-                                .getTopicPolicies(TopicName.get(topicName)))
-                                .map(TopicPolicies::getReplicatorDispatchRate);
-                        break;
-                    default:
-                        break;
-                }
-            } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-                log.debug("Topic {} policies have not been initialized yet.", topicName);
-            } catch (Exception e) {
-                log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
-            }
-        }
-
-        return dispatchRate;
-    }
-
-    public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
-            final Optional<Policies> policies, final String topicName, final Type type) {
-        DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
-        if (dispatchRate == null) {
-            switch (type) {
-                case TOPIC:
-                    return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
-                        || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
-                case SUBSCRIPTION:
-                    return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
-                        || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0;
-                case REPLICATOR:
-                    return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0
-                        || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0;
-                default:
-                    log.error("error DispatchRateLimiter type: {} ", type);
-                    return false;
-            }
-        }
-        return true;
-    }
-
     @SuppressWarnings("deprecation")
     public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
                                                            Optional<Policies> policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a7..932c446669d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -23,12 +23,10 @@ import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPay
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -39,7 +37,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelPromise;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.pulsar.broker.PulsarService;
@@ -50,11 +47,9 @@ import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.RedeliveryTracker;
-import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
-import org.mockito.MockedStatic;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -94,17 +89,9 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
 
         subscriptionMock = mock(NonPersistentSubscription.class);
 
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, subscriptionMock,
-                    new HashRangeAutoSplitStickyKeyConsumerSelector());
-        }
+        nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, subscriptionMock,
+            new HashRangeAutoSplitStickyKeyConsumerSelector());
     }
 
     @Test(timeOut = 10000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac4..6a44e3dfcf7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -26,13 +26,11 @@ import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyList;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.anySet;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -45,7 +43,6 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -72,7 +69,6 @@ import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.mockito.ArgumentCaptor;
-import org.mockito.MockedStatic;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -143,17 +139,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
 
         subscriptionMock = mock(PersistentSubscription.class);
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, cursorMock, subscriptionMock, configMock,
-                    new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
-        }
+        persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, cursorMock, subscriptionMock, configMock,
+            new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
     }
 
     @Test
@@ -197,40 +185,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
 
     @Test(timeOut = 10000)
     public void testSendMessage() {
-        try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
-            rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
-                            any(BrokerService.class),
-                            any(Optional.class),
-                            anyString(),
-                            any(DispatchRateLimiter.Type.class)))
-                    .thenReturn(false);
-            KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
-            DispatchRateLimiter.isDispatchRateNeeded(brokerMock, Optional.empty(), "hello", DispatchRateLimiter.Type.SUBSCRIPTION);
-            PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
-                    topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
-            try {
-                keySharedMeta.addHashRange()
-                        .setStart(0)
-                        .setEnd(9);
-
-                Consumer consumerMock = mock(Consumer.class);
-                doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
-                persistentDispatcher.addConsumer(consumerMock);
-                persistentDispatcher.consumerFlow(consumerMock, 1000);
-            } catch (Exception e) {
-                fail("Failed to add mock consumer", e);
-            }
+        KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
+        PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+            topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
+        try {
+            keySharedMeta.addHashRange()
+                .setStart(0)
+                .setEnd(9);
 
-            List<Entry> entries = new ArrayList<>();
-            entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
-            entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+            Consumer consumerMock = mock(Consumer.class);
+            doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+            persistentDispatcher.addConsumer(consumerMock);
+            persistentDispatcher.consumerFlow(consumerMock, 1000);
+        } catch (Exception e) {
+            fail("Failed to add mock consumer", e);
+        }
 
-            try {
-                //Should success,see issue #8960
-                persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
-            } catch (Exception e) {
-                fail("Failed to readEntriesComplete.", e);
-            }
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+        entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+        try {
+            //Should success,see issue #8960
+            persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+        } catch (Exception e) {
+            fail("Failed to readEntriesComplete.", e);
         }
     }