You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/14 02:29:38 UTC
[pulsar] branch master updated: Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 39dbeb2d08d Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
39dbeb2d08d is described below
commit 39dbeb2d08d0f8e6495ef4f97e1c28fc72b6de87
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Tue Jun 14 10:29:31 2022 +0800
Cleanup DispatchRateLimiter method isDispatchRateNeeded (#15953)
---
.../service/persistent/DispatchRateLimiter.java | 75 ----------------------
...ntStickyKeyDispatcherMultipleConsumersTest.java | 19 +-----
...ntStickyKeyDispatcherMultipleConsumersTest.java | 73 ++++++++-------------
3 files changed, 29 insertions(+), 138 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 481a97ee15b..5f788c9ac14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -24,12 +24,10 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
@@ -203,79 +201,6 @@ public class DispatchRateLimiter {
updateDispatchRate(dispatchRate);
}
- public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
- String topicName, Type type) {
- final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
- if (type == Type.BROKER) {
- return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
- }
-
- Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
- if (dispatchRate.isPresent()) {
- return true;
- }
-
- policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
- return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
- }
-
- public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
- String topicName, Type type) {
- Optional<DispatchRate> dispatchRate = Optional.empty();
- final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
- if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
- try {
- switch (type) {
- case TOPIC:
- dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
- .getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getDispatchRate);
- break;
- case SUBSCRIPTION:
- dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
- .getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getSubscriptionDispatchRate);
- break;
- case REPLICATOR:
- dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
- .getTopicPolicies(TopicName.get(topicName)))
- .map(TopicPolicies::getReplicatorDispatchRate);
- break;
- default:
- break;
- }
- } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
- log.debug("Topic {} policies have not been initialized yet.", topicName);
- } catch (Exception e) {
- log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
- }
- }
-
- return dispatchRate;
- }
-
- public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
- final Optional<Policies> policies, final String topicName, final Type type) {
- DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
- if (dispatchRate == null) {
- switch (type) {
- case TOPIC:
- return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
- || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
- case SUBSCRIPTION:
- return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
- || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0;
- case REPLICATOR:
- return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0
- || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0;
- default:
- log.error("error DispatchRateLimiter type: {} ", type);
- return false;
- }
- }
- return true;
- }
-
@SuppressWarnings("deprecation")
public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
Optional<Policies> policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index f319b7ce4a7..932c446669d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -23,12 +23,10 @@ import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPay
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -39,7 +37,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.PulsarService;
@@ -50,11 +47,9 @@ import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.RedeliveryTracker;
-import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
-import org.mockito.MockedStatic;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -94,17 +89,9 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumersTest {
subscriptionMock = mock(NonPersistentSubscription.class);
- try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
- rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
- any(BrokerService.class),
- any(Optional.class),
- anyString(),
- any(DispatchRateLimiter.Type.class)))
- .thenReturn(false);
- nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, subscriptionMock,
- new HashRangeAutoSplitStickyKeyConsumerSelector());
- }
+ nonpersistentDispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, subscriptionMock,
+ new HashRangeAutoSplitStickyKeyConsumerSelector());
}
@Test(timeOut = 10000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99a66f44ac4..6a44e3dfcf7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -26,13 +26,11 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anySet;
-import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -45,7 +43,6 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -72,7 +69,6 @@ import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.mockito.ArgumentCaptor;
-import org.mockito.MockedStatic;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -143,17 +139,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
);
subscriptionMock = mock(PersistentSubscription.class);
- try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
- rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
- any(BrokerService.class),
- any(Optional.class),
- anyString(),
- any(DispatchRateLimiter.Type.class)))
- .thenReturn(false);
- persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, cursorMock, subscriptionMock, configMock,
- new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
- }
+ persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock,
+ new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
}
@Test
@@ -197,40 +185,31 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
@Test(timeOut = 10000)
public void testSendMessage() {
- try (MockedStatic<DispatchRateLimiter> rateLimiterMockedStatic = mockStatic(DispatchRateLimiter.class);) {
- rateLimiterMockedStatic.when(() -> DispatchRateLimiter.isDispatchRateNeeded(
- any(BrokerService.class),
- any(Optional.class),
- anyString(),
- any(DispatchRateLimiter.Type.class)))
- .thenReturn(false);
- KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
- DispatchRateLimiter.isDispatchRateNeeded(brokerMock, Optional.empty(), "hello", DispatchRateLimiter.Type.SUBSCRIPTION);
- PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
- topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
- try {
- keySharedMeta.addHashRange()
- .setStart(0)
- .setEnd(9);
-
- Consumer consumerMock = mock(Consumer.class);
- doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
- persistentDispatcher.addConsumer(consumerMock);
- persistentDispatcher.consumerFlow(consumerMock, 1000);
- } catch (Exception e) {
- fail("Failed to add mock consumer", e);
- }
+ KeySharedMeta keySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
+ PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
+ topicMock, cursorMock, subscriptionMock, configMock, keySharedMeta);
+ try {
+ keySharedMeta.addHashRange()
+ .setStart(0)
+ .setEnd(9);
- List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
- entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+ Consumer consumerMock = mock(Consumer.class);
+ doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta();
+ persistentDispatcher.addConsumer(consumerMock);
+ persistentDispatcher.consumerFlow(consumerMock, 1000);
+ } catch (Exception e) {
+ fail("Failed to add mock consumer", e);
+ }
- try {
- //Should success,see issue #8960
- persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
- } catch (Exception e) {
- fail("Failed to readEntriesComplete.", e);
- }
+ List<Entry> entries = new ArrayList<>();
+ entries.add(EntryImpl.create(1, 1, createMessage("message1", 1)));
+ entries.add(EntryImpl.create(1, 2, createMessage("message2", 2)));
+
+ try {
+ //Should success,see issue #8960
+ persistentDispatcher.readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ } catch (Exception e) {
+ fail("Failed to readEntriesComplete.", e);
}
}