You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/06 12:40:46 UTC

[GitHub] [pulsar] liudezhi2098 opened a new issue #6892: topicPublishRateLimiter not effective after restart broker

liudezhi2098 opened a new issue #6892:
URL: https://github.com/apache/pulsar/issues/6892


   when config set-publish-rate on namespaces,then can limit publish rate, but when restart broker the limit has expired.
   
   1、*config set-publish-rate *
   ```bash
   sh pulsar-admin namespaces set-publish-rate -b -1 -m 200 public/test_rate
   ```
   
   2、*restart broker (only 1 broker)*
   AbstractTopic throw exception : CompletableFuture cannot cast Policies 
   ```
   try {
               policies = brokerService.pulsar().getConfigurationCache().policiesCache()
                       .getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
               if (policies == null) {
                   policies = new Policies();
               }
           } catch (Exception e) {
               log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
           }
           updatePublishDispatcher(policies);
   ```
   then this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER 
   ```java
     private void updatePublishDispatcher(Policies policies) {
           final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
           final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
                   ? policies.publishMaxMessageRate.get(clusterName)
                   : null;
           if (publishRate != null
                   && (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
               log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);
               // lazy init Publish-rateLimiting monitoring if not initialized yet
               this.brokerService.setupTopicPublishRateLimiterMonitor();
               if (this.topicPublishRateLimiter == null
                       || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
                   // create new rateLimiter if rate-limiter is disabled
                   this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
               } else {
                   this.topicPublishRateLimiter.update(policies, clusterName);
               }
           } else {
               log.info("Disabling publish throttling for {}", this.topic);
               this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
               enableProducerRead();
           }
       }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org