You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/21 23:53:30 UTC

[pulsar] branch branch-2.10 updated: [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a6cb78694d5 [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
a6cb78694d5 is described below

commit a6cb78694d54fae1089e0d1c2a91216cc8481e6b
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 19:24:40 2022 -0700

    [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
    
    (cherry picked from commit de7c586b92fcf4506317b90e1d5eba482ca93626)
---
 conf/broker.conf                                   |  3 +++
 conf/standalone.conf                               |  3 +++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++++
 .../org/apache/pulsar/broker/PulsarService.java    |  3 +--
 .../broker/loadbalance/impl/LoadManagerShared.java |  3 ---
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  9 ++++----
 .../data/loadbalancer/LocalBrokerData.java         |  3 ++-
 .../data/loadbalancer/LocalBrokerDataTest.java     | 26 ++++++++++++++++++++++
 8 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 08884a4be57..b081f40b4b8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1091,6 +1091,9 @@ loadBalancerEnabled=true
 # Percentage of change to trigger load report update
 loadBalancerReportUpdateThresholdPercentage=10
 
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
 # maximum interval to update load report
 loadBalancerReportUpdateMaxIntervalMinutes=15
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 8eb25a2aa65..90f23a7915f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -774,6 +774,9 @@ loadBalancerEnabled=false
 # Percentage of change to trigger load report update
 loadBalancerReportUpdateThresholdPercentage=10
 
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
 # maximum interval to update load report
 loadBalancerReportUpdateMaxIntervalMinutes=15
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ad1fa8ff574..85e53612bd3 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1929,6 +1929,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
         category = CATEGORY_LOAD_BALANCER,
         doc = "maximum interval to update load report"
     )
+    private int loadBalancerReportUpdateMinIntervalMillis = 5000;
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Min delay of load report to collect, in milli-seconds"
+    )
     private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
     @FieldContext(
         category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2a092b5b769..262ada116a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -89,7 +89,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.protocol.ProtocolHandlers;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
@@ -1091,7 +1090,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         if (config.isLoadBalancerEnabled()) {
             LOG.info("Starting load balancer");
             if (this.loadReportTask == null) {
-                long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+                long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
                 this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
                         new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
                         TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index c0ee0d2f986..e0127bd3864 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -75,9 +75,6 @@ public class LoadManagerShared {
         }
     };
 
-    // update LoadReport at most every 5 seconds
-    public static final long LOAD_REPORT_UPDATE_MINIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
-
     private static final String DEFAULT_DOMAIN = "default";
 
     // Don't allow construction: static method namespace only.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 4f7e37ad344..06c87cba627 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
@@ -1160,10 +1159,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
     public void writeLoadReportOnZookeeper() throws Exception {
         // update average JVM heap usage to average value of the last 120 seconds
         long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes();
+        int minInterval = pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
         if (this.avgJvmHeapUsageMBytes <= 0) {
             this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage;
         } else {
-            long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MINIMUM_INTERVAL);
+
+            long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / minInterval);
             this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight;
         }
 
@@ -1182,7 +1183,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes();
             if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) {
                 needUpdate = true;
-            } else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL) {
+            } else if (timeElapsedSinceLastReport > minInterval) {
                 // check number of bundles assigned, comparing with last LoadReport
                 long oldBundleCount = lastLoadReport.getNumBundles();
                 long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
@@ -1251,7 +1252,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
      */
     private boolean isLoadReportGenerationIntervalPassed() {
         long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
-        return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+        return timeSinceLastGenMillis > pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
     }
 
     // todo: changeme: this can be optimized, we don't have to iterate through everytime
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 8736b675272..3c97439f814 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -235,7 +235,8 @@ public class LocalBrokerData implements LoadManagerReport {
     }
 
     public double getMaxResourceUsage() {
-        return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+        // does not consider memory because it is noisy by gc.
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
                 bandwidthOut.percentUsage()) / 100;
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
index e1f7222133a..69d4a7f4cd1 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -22,6 +22,8 @@ import com.google.gson.Gson;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+
 public class LocalBrokerDataTest {
 
     @Test
@@ -33,4 +35,28 @@ public class LocalBrokerDataTest {
         Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 0.0001f);
         Assert.assertEquals(localBrokerData.getMemory().percentUsage(), ((float) localBrokerData.getMemory().usage) / ((float) localBrokerData.getMemory().limit) * 100, 0.0001f);
     }
+
+    @Test
+    public void testMaxResourceUsage() {
+        LocalBrokerData data = new LocalBrokerData();
+        data.setCpu(new ResourceUsage(1.0, 100.0));
+        data.setMemory(new ResourceUsage(800.0, 200.0));
+        data.setDirectMemory(new ResourceUsage(2.0, 100.0));
+        data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
+        data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
+
+        double epsilon = 0.00001;
+        double weight = 0.5;
+        // skips memory usage
+        assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
+
+        assertEquals(
+                data.getMaxResourceUsageWithWeight(
+                        weight, weight, weight, weight, weight), 2.0, epsilon);
+
+        assertEquals(
+                data.getMaxResourceUsageWithWeightWithinLimit(
+                        weight, weight, weight, weight, weight), 0.02, epsilon);
+
+    }
 }