You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/08/31 22:04:09 UTC
[pulsar] branch master updated: [PIP-82] [pulsar-broker] Misc
fixes: (#11821)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60a5698 [PIP-82] [pulsar-broker] Misc fixes: (#11821)
60a5698 is described below
commit 60a5698232932df153c1f39045b3ca6c53298d76
Author: kaushik-develop <80...@users.noreply.github.com>
AuthorDate: Tue Aug 31 15:03:16 2021 -0700
[PIP-82] [pulsar-broker] Misc fixes: (#11821)
- fix updateLocalQuota to not attempt Dispatch changes
- add checks for totalUsed of 0, and unconfigured RG in ResourceQuotaCalculatorImpl
- UT for above
Co-authored-by: Kaushik Ghosh <ka...@splunk.com>
---
.../pulsar/broker/resourcegroup/ResourceGroup.java | 47 +++++++++++++++------
.../broker/resourcegroup/ResourceGroupService.java | 48 ++++++++++++++++------
.../resourcegroup/ResourceQuotaCalculatorImpl.java | 18 ++++++--
.../ResourceQuotaCalculatorImplTest.java | 12 +++++-
4 files changed, 96 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 363594f..0435bd9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -179,7 +179,9 @@ public class ResourceGroup {
// If this is the first ref, register with the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
- log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
+ if (log.isDebugEnabled()) {
+ log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
+ }
transportManager.registerResourceUsagePublisher(this.ruPublisher);
transportManager.registerResourceUsageConsumer(this.ruConsumer);
}
@@ -192,7 +194,9 @@ public class ResourceGroup {
// If this was the last ref, unregister from the transport manager.
if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
- log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
+ if (log.isDebugEnabled()) {
+ log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
+ }
transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
transportManager.unregisterResourceUsagePublisher(this.ruPublisher);
}
@@ -321,6 +325,14 @@ public class ResourceGroup {
protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
+ // Only the Publish side is functional now; add the Dispatch side code when the consume side is ready.
+ if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Doing nothing for monClass={}; only Publish is functional", monClass);
+ }
+ return null;
+ }
+
this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount;
@@ -333,8 +345,10 @@ public class ResourceGroup {
} finally {
monEntity.localUsageStatsLock.unlock();
}
- log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
- this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
+ if (log.isDebugEnabled()) {
+ log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
+ this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
+ }
return oldBMCount;
}
@@ -434,11 +448,16 @@ public class ResourceGroup {
double sentCount = sendReport ? 1 : 0;
rgLocalUsageReportCount.labels(rgName, monClass.name()).inc(sentCount);
if (sendReport) {
- log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
- rgName, monClass, bytesUsed, messagesUsed);
+ if (log.isDebugEnabled()) {
+ log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
+ rgName, monClass, bytesUsed, messagesUsed);
+ }
} else {
- log.debug("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)",
- rgName, monClass, numSuppressions);
+ if (log.isDebugEnabled()) {
+ log.debug("fillResourceUsage for RG={}: report for {} suppressed "
+ + "(suppressions={} since last sent report)",
+ rgName, monClass, numSuppressions);
+ }
}
return sendReport;
@@ -479,11 +498,13 @@ public class ResourceGroup {
oldMessageCount = oldUsageStats.usedValues.messages;
}
- log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
- + "with bytes={} (old ={}), messages={} (old={})",
- this.resourceGroupName, monClass, broker,
- newByteCount, oldByteCount,
- newMessageCount, oldMessageCount);
+ if (log.isDebugEnabled()) {
+ log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
+ + "with bytes={} (old ={}), messages={} (old={})",
+ this.resourceGroupName, monClass, broker,
+ newByteCount, oldByteCount,
+ newMessageCount, oldMessageCount);
+ }
}
private void setResourceGroupMonitoringClassFields() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 86a11d0..4a40235 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -445,10 +445,12 @@ public class ResourceGroupService {
try {
boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
- log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
- + "by {} bytes, {} mesgs",
- topicName, monClass, statsUpdated, tenantString, nsString,
- bmDiff.bytes, bmDiff.messages);
+ if (log.isDebugEnabled()) {
+ log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
+ + "by {} bytes, {} mesgs",
+ topicName, monClass, statsUpdated, tenantString, nsString,
+ bmDiff.bytes, bmDiff.messages);
+ }
hm.put(topicName, bmNewCount);
} catch (Throwable t) {
log.error("updateStatsWithDiff: got ex={} while aggregating for {} side",
@@ -553,7 +555,9 @@ public class ResourceGroupService {
ResourceGroupMonitoringClass.Dispatch);
}
double diffTimeSeconds = aggrUsageTimer.observeDuration();
- log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
+ if (log.isDebugEnabled()) {
+ log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
+ }
// Check any re-scheduling requirements for next time.
// Use the same period as getResourceUsagePublishIntervalInSecs;
@@ -610,12 +614,30 @@ public class ResourceGroupService {
globUsageMessagesArray);
BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
- rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
- rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
- long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
- long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
- log.debug("calculateQuota for RG {} [class {}]: bytes incremented by {}, messages by {}",
- rgName, monClass, messagesIncrement, bytesIncrement);
+ // Guard against unconfigured quota settings, for which computeLocalQuota will return negative.
+ if (updatedQuota.messages >= 0) {
+ rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
+ }
+ if (updatedQuota.bytes >= 0) {
+ rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
+ }
+ if (oldBMCount != null) {
+ long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
+ long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
+ if (log.isDebugEnabled()) {
+ log.debug("calculateQuota for RG={} [class {}]: "
+ + "updatedlocalBytes={}, updatedlocalMesgs={}; "
+ + "old bytes={}, old mesgs={}; incremented bytes by {}, messages by {}",
+ rgName, monClass, updatedQuota.bytes, updatedQuota.messages,
+ oldBMCount.bytes, oldBMCount.messages,
+ bytesIncrement, messagesIncrement);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("calculateQuota for RG={} [class {}]: got back null from updateLocalQuota",
+ rgName, monClass);
+ }
+ }
} catch (Throwable t) {
log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}",
t.getMessage(), monClass, rgName);
@@ -623,7 +645,9 @@ public class ResourceGroupService {
}
});
double diffTimeSeconds = quotaCalcTimer.observeDuration();
- log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
+ if (log.isDebugEnabled()) {
+ log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
+ }
// Check any re-scheduling requirements for next time.
// Use the same period as getResourceUsagePublishIntervalInSecs;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
index 773be27..27a3085 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java
@@ -39,9 +39,12 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
if (confUsage < 0) {
// This can happen if the RG is not configured with this particular limit (message or byte count) yet.
- // It is safe to return a high value (so we don't limit) for the quota.
- log.debug("Configured usage {} is not set; returning a high calculated quota", confUsage);
- return Long.MAX_VALUE;
+ val retVal = -1;
+ if (log.isDebugEnabled()) {
+ log.debug("Configured usage ({}) is not set; returning a special value ({}) for calculated quota",
+ confUsage, retVal);
+ }
+ return retVal;
}
if (myUsage < 0 || totalUsage < 0) {
@@ -51,6 +54,15 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator {
throw new PulsarAdminException(errMesg);
}
+ // If the total usage is zero (which may happen during initial transients), just return the configured value.
+ // The caller is expected to check the value returned, or not call here with a zero global usage.
+ // [This avoids a division by zero when calculating the local share.]
+ if (totalUsage == 0) {
+ log.warn("computeLocalQuota: totalUsage is zero; returning the configured usage ({}) as new local quota",
+ confUsage);
+ return confUsage;
+ }
+
if (myUsage > totalUsage) {
String errMesg = String.format("Local usage (%d) is greater than total usage (%d)",
myUsage, totalUsage);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
index 7d3653a..1a98838 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java
@@ -46,7 +46,8 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest
public void testRQCalcNegativeConfTest() throws PulsarAdminException {
final long[] allUsage = { 0 };
long calculatedQuota = this.rqCalc.computeLocalQuota(-1, 0, allUsage);
- Assert.assertEquals(calculatedQuota, Long.MAX_VALUE);
+ long expectedQuota = -1;
+ Assert.assertEquals(calculatedQuota, expectedQuota);
}
@Test
@@ -102,5 +103,14 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest
Assert.assertEquals(initialUsageRatio, proposedUsageRatio);
}
+ @Test
+ public void testRQCalcGlobUsedZeroTest() throws PulsarAdminException {
+ final long config = 10; // don't care
+ final long localUsed = 0; // don't care
+ final long[] allUsage = { 0 };
+ final long newQuota = this.rqCalc.computeLocalQuota(config, localUsed, allUsage);
+ Assert.assertTrue(newQuota == config);
+ }
+
private ResourceQuotaCalculatorImpl rqCalc;
}
\ No newline at end of file