You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/03 18:56:58 UTC

[pulsar] branch master updated: Do not register each DispatchRateLimiter for policies notifications (#2699)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e9c35c  Do not register each DispatchRateLimiter for policies notifications (#2699)
5e9c35c is described below

commit 5e9c35c67db5ae2f47a72ebb15c9adbeecca3c90
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 3 11:56:53 2018 -0700

    Do not register each DispatchRateLimiter for policies notifications (#2699)
    
    * Do not register each DispatchRateLimiter for policies notifications
    
    * Fixed updates on per-subscriptions limits
    
    * Fixed testClusterPolicyOverrideConfiguration
---
 .../apache/pulsar/broker/service/Dispatcher.java   | 10 ++--
 .../service/persistent/DispatchRateLimiter.java    | 58 +++++++++-------------
 .../PersistentDispatcherMultipleConsumers.java     |  5 ++
 .../PersistentDispatcherSingleActiveConsumer.java  |  5 ++
 .../broker/service/persistent/PersistentTopic.java | 10 +++-
 .../SubscriptionMessageDispatchThrottlingTest.java |  9 ++++
 6 files changed, 58 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 43f65cd..76bfdc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 
@@ -45,14 +46,14 @@ public interface Dispatcher {
 
     /**
      * mark dispatcher closed to stop new incoming requests and disconnect all consumers
-     * 
+     *
      * @return
      */
     CompletableFuture<Void> close();
-    
+
     /**
      * disconnect all consumers
-     * 
+     *
      * @return
      */
     CompletableFuture<Void> disconnectAllConsumers();
@@ -72,4 +73,7 @@ public interface Dispatcher {
 
     RedeliveryTracker getRedeliveryTracker();
 
+    default DispatchRateLimiter getRateLimiter() {
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index e511d40..36f81a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -48,7 +48,6 @@ public class DispatchRateLimiter {
         this.subscriptionName = subscriptionName;
         this.brokerService = topic.getBrokerService();
         updateDispatchRate();
-        registerLocalPoliciesListener();
     }
 
     public DispatchRateLimiter(PersistentTopic topic) {
@@ -119,40 +118,31 @@ public class DispatchRateLimiter {
         log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
     }
 
-    /**
-     * Register listener on namespace policy change to update dispatch-rate if required
-     *
-     */
-    private void registerLocalPoliciesListener() {
-        brokerService.pulsar().getConfigurationCache().policiesCache().registerListener((path, data, stat) -> {
-            final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
-            final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-            final String policiesPath = path(POLICIES, namespace.toString());
-            if (policiesPath.equals(path)) {
-                DispatchRate dispatchRate;
-                if (subscriptionName == null) {
-                    dispatchRate = data.clusterDispatchRate.get(cluster);
-                } else {
-                    dispatchRate = data.subscriptionDispatchRate.get(cluster);
-                }
-                // update dispatch-rate only if it's configured in policies else ignore
-                if (dispatchRate != null) {
-                    int inMsg = (subscriptionName == null) ?
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
-                    long inByte = (subscriptionName == null) ?
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
-                    final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
-                    // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
-                    // cluster-throttling rate
-                    if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
-                        dispatchRate = newDispatchRate;
-                    }
-                    updateDispatchRate(dispatchRate);
-                }
+    public void onPoliciesUpdate(Policies data) {
+        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
+
+        DispatchRate dispatchRate;
+        if (subscriptionName == null) {
+            dispatchRate = data.clusterDispatchRate.get(cluster);
+        } else {
+            dispatchRate = data.subscriptionDispatchRate.get(cluster);
+        }
+        // update dispatch-rate only if it's configured in policies else ignore
+        if (dispatchRate != null) {
+            int inMsg = (subscriptionName == null) ?
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
+            long inByte = (subscriptionName == null) ?
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
+            final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
+            // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
+            // cluster-throttling rate
+            if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
+                dispatchRate = newDispatchRate;
             }
-        });
+            updateDispatchRate(dispatchRate);
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5198a13..41b8ec7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -636,5 +636,10 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         return redeliveryTracker;
     }
 
+    @Override
+    public DispatchRateLimiter getRateLimiter() {
+        return dispatchRateLimiter;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 89dfb47..275b236 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -496,5 +496,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         return redeliveryTracker;
     }
 
+    @Override
+    public DispatchRateLimiter getRateLimiter() {
+        return dispatchRateLimiter;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6c12296..34703fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -166,7 +166,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
     // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
     // doesn't support batch-message
     private volatile boolean hasBatchMessagePublished = false;
-    private DispatchRateLimiter dispatchRateLimiter;
+    private final DispatchRateLimiter dispatchRateLimiter;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
@@ -1559,11 +1559,17 @@ public class PersistentTopic implements Topic, AddEntryCallback {
             producer.checkPermissions();
             producer.checkEncryption();
         });
-        subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(Consumer::checkPermissions);
+            if (sub.getDispatcher().getRateLimiter() != null) {
+                sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
+            }
+        });
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
         CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
         CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
+        dispatchRateLimiter.onPoliciesUpdate(data);
         return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index b4b60af..5e3e0d2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -502,6 +502,15 @@ public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchTh
         int nsMessageRate = 500;
         DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
+
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+
         for (int i = 0; i < 5; i++) {
             if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
                 Thread.sleep(50 + (i * 10));