You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/28 03:14:34 UTC

[GitHub] [pulsar] Jason918 commented on a diff in pull request #15789: [broker]tidy update subscriptions dispatcher rate-limiter

Jason918 commented on code in PR #15789:
URL: https://github.com/apache/pulsar/pull/15789#discussion_r884060963


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3066,6 +3036,19 @@ public void onUpdate(TopicPolicies policies) {
         });
     }
 
+    private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
+        List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
+        subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
+            consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
+                Dispatcher dispatcher = sub.getDispatcher();
+                if (dispatcher != null) {
+                    dispatcher.updateRateLimiter();
+                }
+            }));
+        }));
+        return CompletableFuture.allOf(consumerCheckFutures.toArray(new CompletableFuture[0]));

Review Comment:
   Is this different with `FutureUtil.waitForAll(consumerCheckFutures)`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3066,6 +3036,19 @@ public void onUpdate(TopicPolicies policies) {
         });
     }
 
+    private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
+        List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
+        subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
+            consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
+                Dispatcher dispatcher = sub.getDispatcher();

Review Comment:
   It seems we don't need check `dispatcher` for each consumer. Previous codes in L2416 make more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org