You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/03 18:56:58 UTC
[pulsar] branch master updated: Do not register each
DispatchRateLimiter for policies notifications (#2699)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e9c35c Do not register each DispatchRateLimiter for policies notifications (#2699)
5e9c35c is described below
commit 5e9c35c67db5ae2f47a72ebb15c9adbeecca3c90
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 3 11:56:53 2018 -0700
Do not register each DispatchRateLimiter for policies notifications (#2699)
* Do not register each DispatchRateLimiter for policies notifications
* Fixed updates on per-subscriptions limits
* Fixed testClusterPolicyOverrideConfiguration
---
.../apache/pulsar/broker/service/Dispatcher.java | 10 ++--
.../service/persistent/DispatchRateLimiter.java | 58 +++++++++-------------
.../PersistentDispatcherMultipleConsumers.java | 5 ++
.../PersistentDispatcherSingleActiveConsumer.java | 5 ++
.../broker/service/persistent/PersistentTopic.java | 10 +++-
.../SubscriptionMessageDispatchThrottlingTest.java | 9 ++++
6 files changed, 58 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 43f65cd..76bfdc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -45,14 +46,14 @@ public interface Dispatcher {
/**
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
- *
+ *
* @return
*/
CompletableFuture<Void> close();
-
+
/**
* disconnect all consumers
- *
+ *
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();
@@ -72,4 +73,7 @@ public interface Dispatcher {
RedeliveryTracker getRedeliveryTracker();
+ default DispatchRateLimiter getRateLimiter() {
+ return null;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index e511d40..36f81a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -48,7 +48,6 @@ public class DispatchRateLimiter {
this.subscriptionName = subscriptionName;
this.brokerService = topic.getBrokerService();
updateDispatchRate();
- registerLocalPoliciesListener();
}
public DispatchRateLimiter(PersistentTopic topic) {
@@ -119,40 +118,31 @@ public class DispatchRateLimiter {
log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
}
- /**
- * Register listener on namespace policy change to update dispatch-rate if required
- *
- */
- private void registerLocalPoliciesListener() {
- brokerService.pulsar().getConfigurationCache().policiesCache().registerListener((path, data, stat) -> {
- final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
- final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
- final String policiesPath = path(POLICIES, namespace.toString());
- if (policiesPath.equals(path)) {
- DispatchRate dispatchRate;
- if (subscriptionName == null) {
- dispatchRate = data.clusterDispatchRate.get(cluster);
- } else {
- dispatchRate = data.subscriptionDispatchRate.get(cluster);
- }
- // update dispatch-rate only if it's configured in policies else ignore
- if (dispatchRate != null) {
- int inMsg = (subscriptionName == null) ?
- brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
- brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
- long inByte = (subscriptionName == null) ?
- brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
- brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
- final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
- // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
- // cluster-throttling rate
- if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
- dispatchRate = newDispatchRate;
- }
- updateDispatchRate(dispatchRate);
- }
+ public void onPoliciesUpdate(Policies data) {
+ String cluster = brokerService.pulsar().getConfiguration().getClusterName();
+
+ DispatchRate dispatchRate;
+ if (subscriptionName == null) {
+ dispatchRate = data.clusterDispatchRate.get(cluster);
+ } else {
+ dispatchRate = data.subscriptionDispatchRate.get(cluster);
+ }
+ // update dispatch-rate only if it's configured in policies else ignore
+ if (dispatchRate != null) {
+ int inMsg = (subscriptionName == null) ?
+ brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
+ brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
+ long inByte = (subscriptionName == null) ?
+ brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
+ brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
+ final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
+ // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
+ // cluster-throttling rate
+ if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
+ dispatchRate = newDispatchRate;
}
- });
+ updateDispatchRate(dispatchRate);
+ }
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5198a13..41b8ec7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -636,5 +636,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
return redeliveryTracker;
}
+ @Override
+ public DispatchRateLimiter getRateLimiter() {
+ return dispatchRateLimiter;
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 89dfb47..275b236 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -496,5 +496,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
return redeliveryTracker;
}
+ @Override
+ public DispatchRateLimiter getRateLimiter() {
+ return dispatchRateLimiter;
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6c12296..34703fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -166,7 +166,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;
- private DispatchRateLimiter dispatchRateLimiter;
+ private final DispatchRateLimiter dispatchRateLimiter;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
private final MessageDeduplication messageDeduplication;
@@ -1559,11 +1559,17 @@ public class PersistentTopic implements Topic, AddEntryCallback {
producer.checkPermissions();
producer.checkEncryption();
});
- subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+ subscriptions.forEach((subName, sub) -> {
+ sub.getConsumers().forEach(Consumer::checkPermissions);
+ if (sub.getDispatcher().getRateLimiter() != null) {
+ sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
+ }
+ });
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
+ dispatchRateLimiter.onPoliciesUpdate(data);
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index b4b60af..5e3e0d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -502,6 +502,15 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
int nsMessageRate = 500;
DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
+
+ if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+ subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+ } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+ subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+ } else {
+ Assert.fail("Should only have PersistentDispatcher in this test");
+ }
+
for (int i = 0; i < 5; i++) {
if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
Thread.sleep(50 + (i * 10));