You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/14 11:45:20 UTC

[GitHub] [pulsar] Jason918 commented on a change in pull request #14267: [Issue 13756][Broker]Optimize topic policy with HierarchyTopicPolicies about publishRate

Jason918 commented on a change in pull request #14267:
URL: https://github.com/apache/pulsar/pull/14267#discussion_r805757458



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -2206,6 +2206,12 @@ private void updateBrokerDispatchThrottlingMaxRate() {
     }
 
     private void updateBrokerPublisherThrottlingMaxRate() {
+        forEachTopic(topic -> {

Review comment:
       This is not right entrance.
   It seems that we missed triggering the update of `publishRate` when broker level settings updated.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -186,11 +185,22 @@ protected void updateTopicPolicy(TopicPolicies data) {
                         data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
         topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
         topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        topicPolicies.getPublishRate().updateTopicValue(normalize(data.getPublishRate()));
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
         topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
     }
 
+    private PublishRate normalize(PublishRate publishRate) {

Review comment:
       It's better to move this to a static method in PublishRate class.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -186,11 +185,22 @@ protected void updateTopicPolicy(TopicPolicies data) {
                         data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
         topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
         topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        topicPolicies.getPublishRate().updateTopicValue(normalize(data.getPublishRate()));
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
         topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
     }
 
+    private PublishRate normalize(PublishRate publishRate) {

Review comment:
       It's better to move this to a static method in PublishRate class.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -230,6 +241,14 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
     }
 
+    private void updateNamespacePublishRate(Policies namespacePolicies, String cluster) {
+        topicPolicies.getPublishRate().updateNamespaceValue(
+            normalize(
+                namespacePolicies.publishMaxMessageRate != null
+                    ? namespacePolicies.publishMaxMessageRate.get(cluster)
+                    : null));

Review comment:
       ```suggestion
                   MapUtils.getObject(namespacePolicies.publishMaxMessageRate, cluster));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org