You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/03 18:56:56 UTC

[GitHub] merlimat closed pull request #2699: Do not register each DispatchRateLimiter for policies notifications

merlimat closed pull request #2699: Do not register each DispatchRateLimiter for policies notifications
URL: https://github.com/apache/pulsar/pull/2699
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 43f65cd73b..76bfdc3d55 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 
@@ -45,14 +46,14 @@
 
     /**
      * mark dispatcher closed to stop new incoming requests and disconnect all consumers
-     * 
+     *
      * @return
      */
     CompletableFuture<Void> close();
-    
+
     /**
      * disconnect all consumers
-     * 
+     *
      * @return
      */
     CompletableFuture<Void> disconnectAllConsumers();
@@ -72,4 +73,7 @@
 
     RedeliveryTracker getRedeliveryTracker();
 
+    default DispatchRateLimiter getRateLimiter() {
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index e511d40f3a..36f81a057b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -48,7 +48,6 @@ public DispatchRateLimiter(PersistentTopic topic, String subscriptionName) {
         this.subscriptionName = subscriptionName;
         this.brokerService = topic.getBrokerService();
         updateDispatchRate();
-        registerLocalPoliciesListener();
     }
 
     public DispatchRateLimiter(PersistentTopic topic) {
@@ -119,40 +118,31 @@ public void updateDispatchRate() {
         log.info("[{}] [{}] configured message-dispatch rate at broker {}", this.topicName, this.subscriptionName, dispatchRate);
     }
 
-    /**
-     * Register listener on namespace policy change to update dispatch-rate if required
-     *
-     */
-    private void registerLocalPoliciesListener() {
-        brokerService.pulsar().getConfigurationCache().policiesCache().registerListener((path, data, stat) -> {
-            final NamespaceName namespace = TopicName.get(this.topicName).getNamespaceObject();
-            final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-            final String policiesPath = path(POLICIES, namespace.toString());
-            if (policiesPath.equals(path)) {
-                DispatchRate dispatchRate;
-                if (subscriptionName == null) {
-                    dispatchRate = data.clusterDispatchRate.get(cluster);
-                } else {
-                    dispatchRate = data.subscriptionDispatchRate.get(cluster);
-                }
-                // update dispatch-rate only if it's configured in policies else ignore
-                if (dispatchRate != null) {
-                    int inMsg = (subscriptionName == null) ?
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
-                    long inByte = (subscriptionName == null) ?
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
-                        brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
-                    final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
-                    // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
-                    // cluster-throttling rate
-                    if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
-                        dispatchRate = newDispatchRate;
-                    }
-                    updateDispatchRate(dispatchRate);
-                }
+    public void onPoliciesUpdate(Policies data) {
+        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
+
+        DispatchRate dispatchRate;
+        if (subscriptionName == null) {
+            dispatchRate = data.clusterDispatchRate.get(cluster);
+        } else {
+            dispatchRate = data.subscriptionDispatchRate.get(cluster);
+        }
+        // update dispatch-rate only if it's configured in policies else ignore
+        if (dispatchRate != null) {
+            int inMsg = (subscriptionName == null) ?
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInMsg() :
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg();
+            long inByte = (subscriptionName == null) ?
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerTopicInByte() :
+                brokerService.pulsar().getConfiguration().getDispatchThrottlingRatePerSubscribeInByte();
+            final DispatchRate newDispatchRate = new DispatchRate(inMsg, inByte, 1);
+            // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
+            // cluster-throttling rate
+            if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
+                dispatchRate = newDispatchRate;
             }
-        });
+            updateDispatchRate(dispatchRate);
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 5198a139ec..41b8ec7656 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -636,5 +636,10 @@ public RedeliveryTracker getRedeliveryTracker() {
         return redeliveryTracker;
     }
 
+    @Override
+    public DispatchRateLimiter getRateLimiter() {
+        return dispatchRateLimiter;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 89dfb47331..275b236d88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -496,5 +496,10 @@ public RedeliveryTracker getRedeliveryTracker() {
         return redeliveryTracker;
     }
 
+    @Override
+    public DispatchRateLimiter getRateLimiter() {
+        return dispatchRateLimiter;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6c12296a6f..34703faf84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -166,7 +166,7 @@
     // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
     // doesn't support batch-message
     private volatile boolean hasBatchMessagePublished = false;
-    private DispatchRateLimiter dispatchRateLimiter;
+    private final DispatchRateLimiter dispatchRateLimiter;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
@@ -1559,11 +1559,17 @@ private boolean shouldTopicBeRetained() {
             producer.checkPermissions();
             producer.checkEncryption();
         });
-        subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(Consumer::checkPermissions);
+            if (sub.getDispatcher().getRateLimiter() != null) {
+                sub.getDispatcher().getRateLimiter().onPoliciesUpdate(data);
+            }
+        });
         checkMessageExpiry();
         CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
         CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
         CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
+        dispatchRateLimiter.onPoliciesUpdate(data);
         return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index b4b60afb0e..5e3e0d2506 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -502,6 +502,15 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
         int nsMessageRate = 500;
         DispatchRate dispatchRate = new DispatchRate(nsMessageRate, 0, 1);
         admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
+
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = ((PersistentDispatcherMultipleConsumers) subDispatcher).getDispatchRateLimiter();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = ((PersistentDispatcherSingleActiveConsumer) subDispatcher).getDispatchRateLimiter();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+
         for (int i = 0; i < 5; i++) {
             if (subRateLimiter.getDispatchRateOnMsg() != nsMessageRate) {
                 Thread.sleep(50 + (i * 10));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services