You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/08/31 22:04:09 UTC

[pulsar] branch master updated: [PIP-82] [pulsar-broker] Misc fixes: (#11821)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 60a5698  [PIP-82] [pulsar-broker] Misc fixes: (#11821)
60a5698 is described below

commit 60a5698232932df153c1f39045b3ca6c53298d76
Author: kaushik-develop <80...@users.noreply.github.com>
AuthorDate: Tue Aug 31 15:03:16 2021 -0700

    [PIP-82] [pulsar-broker] Misc fixes: (#11821)
    
    - fix updateLocalQuota to not attempt Dispatch changes
        - add checks for totalUsed of 0, and unconfigured RG in ResourceQuotaCalculatorImpl
        - UT for above
    
    Co-authored-by: Kaushik Ghosh <ka...@splunk.com>
---
 .../pulsar/broker/resourcegroup/ResourceGroup.java | 47 +++++++++++++++------
 .../broker/resourcegroup/ResourceGroupService.java | 48 ++++++++++++++++------
 .../resourcegroup/ResourceQuotaCalculatorImpl.java | 18 ++++++--
 .../ResourceQuotaCalculatorImplTest.java           | 12 +++++-
 4 files changed, 96 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 363594f..0435bd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -179,7 +179,9 @@ public class ResourceGroup {
 
             // If this is the first ref, register with the transport manager.
             if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
-                log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
+                if (log.isDebugEnabled()) {
+                    log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
+                }
                 transportManager.registerResourceUsagePublisher(this.ruPublisher);
                 transportManager.registerResourceUsageConsumer(this.ruConsumer);
             }
@@ -192,7 +194,9 @@ public class ResourceGroup {
 
             // If this was the last ref, unregister from the transport manager.
             if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
-                log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
+                if (log.isDebugEnabled()) {
+                    log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
+                }
                 transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
                 transportManager.unregisterResourceUsagePublisher(this.ruPublisher);
             }
@@ -321,6 +325,14 @@ public class ResourceGroup {
 
     protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
                                                      BytesAndMessagesCount newQuota) throws PulsarAdminException {
+        // Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
+        if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
+            }
+            return null;
+        }
+
         this.checkMonitoringClass(monClass);
         BytesAndMessagesCount oldBMCount;
 
@@ -333,8 +345,10 @@ public class ResourceGroup {
         } finally {
             monEntity.localUsageStatsLock.unlock();
         }
-        log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
-                this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
+        if (log.isDebugEnabled()) {
+            log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
+                    this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
+        }
 
         return oldBMCount;
     }
@@ -434,11 +448,16 @@ public class ResourceGroup {
         double sentCount = sendReport ? 1 : 0;
         rgLocalUsageReportCount.labels(rgName, monClass.name()).inc(sentCount);
         if (sendReport) {
-            log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
-                    rgName, monClass, bytesUsed, messagesUsed);
+            if (log.isDebugEnabled()) {
+                log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
+                        rgName, monClass, bytesUsed, messagesUsed);
+            }
         } else {
-            log.debug("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)",
-                    rgName, monClass, numSuppressions);
+            if (log.isDebugEnabled()) {
+                log.debug("fillResourceUsage for RG={}: report for {} suppressed "
+                    + "(suppressions={} since last sent report)",
+                        rgName, monClass, numSuppressions);
+            }
         }
 
         return sendReport;
@@ -479,11 +498,13 @@ public class ResourceGroup {
             oldMessageCount = oldUsageStats.usedValues.messages;
         }
 
-        log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
-                + "with bytes={} (old ={}), messages={} (old={})",
-                this.resourceGroupName, monClass, broker,
-                newByteCount, oldByteCount,
-                newMessageCount, oldMessageCount);
+        if (log.isDebugEnabled()) {
+            log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
+                            + "with bytes={} (old ={}), messages={} (old={})",
+                    this.resourceGroupName, monClass, broker,
+                    newByteCount, oldByteCount,
+                    newMessageCount, oldMessageCount);
+        }
     }
 
     private void setResourceGroupMonitoringClassFields() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 86a11d0..4a40235 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -445,10 +445,12 @@ public class ResourceGroupService {
 
         try {
             boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
-            log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
-                            + "by {} bytes, {} mesgs",
-                    topicName, monClass, statsUpdated, tenantString, nsString,
-                    bmDiff.bytes, bmDiff.messages);
+            if (log.isDebugEnabled()) {
+                log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
+                                + "by {} bytes, {} mesgs",
+                        topicName, monClass, statsUpdated, tenantString, nsString,
+                        bmDiff.bytes, bmDiff.messages);
+            }
             hm.put(topicName, bmNewCount);
         } catch (Throwable t) {
             log.error("updateStatsWithDiff: got ex={} while aggregating for {} side",
@@ -553,7 +555,9 @@ public class ResourceGroupService {
                     ResourceGroupMonitoringClass.Dispatch);
         }
         double diffTimeSeconds = aggrUsageTimer.observeDuration();
-        log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
+        if (log.isDebugEnabled()) {
+            log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
+        }
 
         // Check any re-scheduling requirements for next time.
         // Use the same period as getResourceUsagePublishIntervalInSecs;
@@ -610,12 +614,30 @@ public class ResourceGroupService {
                             globUsageMessagesArray);
 
                     BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
-                    rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
-                    rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
-                    long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
-                    long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
-                    log.debug("calculateQuota for RG {} [class {}]: bytes incremented by {}, messages by {}",
-                            rgName, monClass, messagesIncrement, bytesIncrement);
+                    // Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
+                    if (updatedQuota.messages >= 0) {
+                        rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
+                    }
+                    if (updatedQuota.bytes >= 0) {
+                        rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
+                    }
+                    if (oldBMCount != null) {
+                        long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
+                        long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
+                        if (log.isDebugEnabled()) {
+                            log.debug("calculateQuota for RG={} [class {}]: "
+                                            + "updatedlocalBytes={}, updatedlocalMesgs={}; "
+                                            + "old bytes={}, old mesgs={};  incremented bytes by {}, messages by {}",
+                                    rgName, monClass, updatedQuota.bytes, updatedQuota.messages,
+                                    oldBMCount.bytes, oldBMCount.messages,
+                                    bytesIncrement, messagesIncrement);
+                        }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("calculateQuota for RG={} [class {}]: got back null from updateLocalQuota",
+                                    rgName, monClass);
+                        }
+                    }
                 } catch (Throwable t) {
                     log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}",
                             t.getMessage(), monClass, rgName);
@@ -623,7 +645,9 @@ public class ResourceGroupService {
             }
         });
         double diffTimeSeconds = quotaCalcTimer.observeDuration();
-        log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
+        if (log.isDebugEnabled()) {
+            log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
+        }
 
         // Check any re-scheduling requirements for next time.
         // Use the same period as getResourceUsagePublishIntervalInSecs;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
index 773be27..27a3085 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
@@ -39,9 +39,12 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
 
         if (confUsage < 0) {
             // This can happen if the RG is not configured with this particular limit (message or byte count) yet.
-            // It is safe to return a high value (so we don't limit) for the quota.
-            log.debug("Configured usage {} is not set; returning a high calculated quota", confUsage);
-            return Long.MAX_VALUE;
+            val retVal = -1;
+            if (log.isDebugEnabled()) {
+                log.debug("Configured usage ({}) is not set; returning a special value ({}) for calculated quota",
+                        confUsage, retVal);
+            }
+            return retVal;
         }
 
         if (myUsage < 0 || totalUsage < 0) {
@@ -51,6 +54,15 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
             throw new PulsarAdminException(errMesg);
         }
 
+        // If the total usage is zero (which may happen during initial transients), just return the configured value.
+        // The caller is expected to check the value returned, or not call here with a zero global usage.
+        // [This avoids a division by zero when calculating the local share.]
+        if (totalUsage == 0) {
+            log.warn("computeLocalQuota: totalUsage is zero; returning the configured usage ({}) as new local quota",
+                    confUsage);
+            return confUsage;
+        }
+
         if (myUsage > totalUsage) {
             String errMesg = String.format("Local usage (%d) is greater than total usage (%d)",
                     myUsage, totalUsage);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
index 7d3653a..1a98838 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
@@ -46,7 +46,8 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest
     public void testRQCalcNegativeConfTest() throws PulsarAdminException {
         final long[] allUsage = { 0 };
         long calculatedQuota = this.rqCalc.computeLocalQuota(-1, 0, allUsage);
-        Assert.assertEquals(calculatedQuota, Long.MAX_VALUE);
+        long expectedQuota = -1;
+        Assert.assertEquals(calculatedQuota, expectedQuota);
     }
 
     @Test
@@ -102,5 +103,14 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest
         Assert.assertEquals(initialUsageRatio, proposedUsageRatio);
     }
 
+    @Test
+    public void testRQCalcGlobUsedZeroTest() throws PulsarAdminException {
+        final long config = 10;  // don't care
+        final long localUsed = 0;  // don't care
+        final long[] allUsage = { 0 };
+        final long newQuota = this.rqCalc.computeLocalQuota(config, localUsed, allUsage);
+        Assert.assertTrue(newQuota == config);
+    }
+
     private ResourceQuotaCalculatorImpl rqCalc;
 }
\ No newline at end of file