You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/06 12:40:46 UTC
[GitHub] [pulsar] liudezhi2098 opened a new issue #6892: topicPublishRateLimiter not effective after restart broker
liudezhi2098 opened a new issue #6892:
URL: https://github.com/apache/pulsar/issues/6892
when config set-publish-rate on namespaces,then can limit publish rate, but when restart broker the limit has expired.
1、*config set-publish-rate *
```bash
sh pulsar-admin namespaces set-publish-rate -b -1 -m 200 public/test_rate
```
2、*restart broker (only 1 broker)*
AbstractTopic throw exception : CompletableFuture cannot cast Policies
```
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
updatePublishDispatcher(policies);
```
then this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER
```java
private void updatePublishDispatcher(Policies policies) {
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
if (publishRate != null
&& (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);
// lazy init Publish-rateLimiting monitoring if not initialized yet
this.brokerService.setupTopicPublishRateLimiterMonitor();
if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
} else {
this.topicPublishRateLimiter.update(policies, clusterName);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerRead();
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org