You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/21 23:53:30 UTC
[pulsar] branch branch-2.10 updated: [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a6cb78694d5 [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
a6cb78694d5 is described below
commit a6cb78694d54fae1089e0d1c2a91216cc8481e6b
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 19:24:40 2022 -0700
[improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
(cherry picked from commit de7c586b92fcf4506317b90e1d5eba482ca93626)
---
conf/broker.conf | 3 +++
conf/standalone.conf | 3 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++++
.../org/apache/pulsar/broker/PulsarService.java | 3 +--
.../broker/loadbalance/impl/LoadManagerShared.java | 3 ---
.../loadbalance/impl/SimpleLoadManagerImpl.java | 9 ++++----
.../data/loadbalancer/LocalBrokerData.java | 3 ++-
.../data/loadbalancer/LocalBrokerDataTest.java | 26 ++++++++++++++++++++++
8 files changed, 46 insertions(+), 10 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 08884a4be57..b081f40b4b8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1091,6 +1091,9 @@ loadBalancerEnabled=true
# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 8eb25a2aa65..90f23a7915f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -774,6 +774,9 @@ loadBalancerEnabled=false
# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ad1fa8ff574..85e53612bd3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1929,6 +1929,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_LOAD_BALANCER,
doc = "maximum interval to update load report"
)
+ private int loadBalancerReportUpdateMinIntervalMillis = 5000;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Min delay of load report to collect, in milli-seconds"
+ )
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a092b5b769..262ada116a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -89,7 +89,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
@@ -1091,7 +1090,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
if (config.isLoadBalancerEnabled()) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
- long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+ long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index c0ee0d2f986..e0127bd3864 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -75,9 +75,6 @@ public class LoadManagerShared {
}
};
- // update LoadReport at most every 5 seconds
- public static final long LOAD_REPORT_UPDATE_MINIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
-
private static final String DEFAULT_DOMAIN = "default";
// Don't allow construction: static method namespace only.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 4f7e37ad344..06c87cba627 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@@ -1160,10 +1159,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
public void writeLoadReportOnZookeeper() throws Exception {
// update average JVM heap usage to average value of the last 120 seconds
long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes();
+ int minInterval = pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
if (this.avgJvmHeapUsageMBytes <= 0) {
this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage;
} else {
- long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MINIMUM_INTERVAL);
+
+ long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / minInterval);
this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight;
}
@@ -1182,7 +1183,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes();
if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) {
needUpdate = true;
- } else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL) {
+ } else if (timeElapsedSinceLastReport > minInterval) {
// check number of bundles assigned, comparing with last LoadReport
long oldBundleCount = lastLoadReport.getNumBundles();
long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
@@ -1251,7 +1252,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
*/
private boolean isLoadReportGenerationIntervalPassed() {
long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
- return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+ return timeSinceLastGenMillis > pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
}
// todo: changeme: this can be optimized, we don't have to iterate through everytime
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 8736b675272..3c97439f814 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -235,7 +235,8 @@ public class LocalBrokerData implements LoadManagerReport {
}
public double getMaxResourceUsage() {
- return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+ // does not consider memory because it is noisy by gc.
+ return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
bandwidthOut.percentUsage()) / 100;
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index e1f7222133a..69d4a7f4cd1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -22,6 +22,8 @@ import com.google.gson.Gson;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
public class LocalBrokerDataTest {
@Test
@@ -33,4 +35,28 @@ public class LocalBrokerDataTest {
Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 0.0001f);
Assert.assertEquals(localBrokerData.getMemory().percentUsage(), ((float) localBrokerData.getMemory().usage) / ((float) localBrokerData.getMemory().limit) * 100, 0.0001f);
}
+
+ @Test
+ public void testMaxResourceUsage() {
+ LocalBrokerData data = new LocalBrokerData();
+ data.setCpu(new ResourceUsage(1.0, 100.0));
+ data.setMemory(new ResourceUsage(800.0, 200.0));
+ data.setDirectMemory(new ResourceUsage(2.0, 100.0));
+ data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
+ data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
+
+ double epsilon = 0.00001;
+ double weight = 0.5;
+ // skips memory usage
+ assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
+
+ assertEquals(
+ data.getMaxResourceUsageWithWeight(
+ weight, weight, weight, weight, weight), 2.0, epsilon);
+
+ assertEquals(
+ data.getMaxResourceUsageWithWeightWithinLimit(
+ weight, weight, weight, weight, weight), 0.02, epsilon);
+
+ }
}