You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/21 23:50:17 UTC

[pulsar] branch branch-2.9 updated: [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new aa4c677cf9e [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
aa4c677cf9e is described below

commit aa4c677cf9e38e1cec3a0b11051d9c414ee10f3c
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 19:24:40 2022 -0700

    [improve][loadbalance] added loadBalancerReportUpdateMinIntervalMillis and ignores memory usage in getMaxResourceUsage() (#17598)
    
    (cherry picked from commit de7c586b92fcf4506317b90e1d5eba482ca93626)
---
 conf/broker.conf                                   |  3 ++
 conf/standalone.conf                               |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++
 .../org/apache/pulsar/broker/PulsarService.java    |  3 +-
 .../broker/loadbalance/impl/LoadManagerShared.java |  3 --
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |  9 ++--
 .../data/loadbalancer/LocalBrokerData.java         |  3 +-
 .../data/loadbalancer/LocalBrokerDataTest.java     | 62 ++++++++++++++++++++++
 8 files changed, 82 insertions(+), 10 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 3c4e4b4524f..32ded75096a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1041,6 +1041,9 @@ loadBalancerEnabled=true
 # Percentage of change to trigger load report update
 loadBalancerReportUpdateThresholdPercentage=10
 
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
 # maximum interval to update load report
 loadBalancerReportUpdateMaxIntervalMinutes=15
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 01030d049d4..08742a43cd5 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -742,6 +742,9 @@ loadBalancerEnabled=false
 # Percentage of change to trigger load report update
 loadBalancerReportUpdateThresholdPercentage=10
 
+# minimum interval to update load report
+loadBalancerReportUpdateMinIntervalMillis=5000
+
 # maximum interval to update load report
 loadBalancerReportUpdateMaxIntervalMinutes=15
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 47e47a71d3a..cbcac11acf2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1753,6 +1753,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
         category = CATEGORY_LOAD_BALANCER,
         doc = "maximum interval to update load report"
     )
+    private int loadBalancerReportUpdateMinIntervalMillis = 5000;
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            dynamic = true,
+            doc = "Min delay of load report to collect, in milli-seconds"
+    )
     private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
     @FieldContext(
         category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e47bd1bc783..c10e6acc2a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -89,7 +89,6 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
-import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.protocol.ProtocolHandlers;
 import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
@@ -1043,7 +1042,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         if (config.isLoadBalancerEnabled()) {
             LOG.info("Starting load balancer");
             if (this.loadReportTask == null) {
-                long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+                long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
                 this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
                         new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
                         TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 20b14a9d220..2737e97df14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -75,9 +75,6 @@ public class LoadManagerShared {
         }
     };
 
-    // update LoadReport at most every 5 seconds
-    public static final long LOAD_REPORT_UPDATE_MINIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
-
     private static final String DEFAULT_DOMAIN = "default";
 
     // Don't allow construction: static method namespace only.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 212d299953e..ed766b1b930 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
@@ -1141,10 +1140,12 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
     public void writeLoadReportOnZookeeper() throws Exception {
         // update average JVM heap usage to average value of the last 120 seconds
         long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes();
+        int minInterval = pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
         if (this.avgJvmHeapUsageMBytes <= 0) {
             this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage;
         } else {
-            long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MINIMUM_INTERVAL);
+
+            long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / minInterval);
             this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight;
         }
 
@@ -1163,7 +1164,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
             int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes();
             if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) {
                 needUpdate = true;
-            } else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL) {
+            } else if (timeElapsedSinceLastReport > minInterval) {
                 // check number of bundles assigned, comparing with last LoadReport
                 long oldBundleCount = lastLoadReport.getNumBundles();
                 long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
@@ -1232,7 +1233,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
      */
     private boolean isLoadReportGenerationIntervalPassed() {
         long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
-        return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
+        return timeSinceLastGenMillis > pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
     }
 
     // todo: changeme: this can be optimized, we don't have to iterate through everytime
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index a503a851715..75f32eaa82e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -235,7 +235,8 @@ public class LocalBrokerData implements LoadManagerReport {
     }
 
     public double getMaxResourceUsage() {
-        return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
+        // does not consider memory because it is noisy by gc.
+        return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
                 bandwidthOut.percentUsage()) / 100;
     }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
new file mode 100644
index 00000000000..69d4a7f4cd1
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.policies.data.loadbalancer;
+
+import com.google.gson.Gson;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class LocalBrokerDataTest {
+
+    @Test
+    public void testLocalBrokerDataDeserialization() {
+        String data = "{\"webServiceUrl\":\"http://10.244.2.23:8080\",\"webServiceUrlTls\":\"https://10.244.2.23:8081\",\"pulsarServiceUrlTls\":\"pulsar+ssl://10.244.2.23:6651\",\"persistentTopicsEnabled\":true,\"nonPersistentTopicsEnabled\":false,\"cpu\":{\"usage\":3.1577712104798255,\"limit\":100.0},\"memory\":{\"usage\":614.0,\"limit\":1228.0},\"directMemory\":{\"usage\":32.0,\"limit\":1228.0},\"bandwidthIn\":{\"usage\":0.0,\"limit\":0.0},\"bandwidthOut\":{\"usage\":0.0,\"limit\":0.0} [...]
+        Gson gson = new Gson();
+        LocalBrokerData localBrokerData = gson.fromJson(data, LocalBrokerData.class);
+        Assert.assertEquals(localBrokerData.getMemory().limit, 1228.0d, 0.0001f);
+        Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 0.0001f);
+        Assert.assertEquals(localBrokerData.getMemory().percentUsage(), ((float) localBrokerData.getMemory().usage) / ((float) localBrokerData.getMemory().limit) * 100, 0.0001f);
+    }
+
+    @Test
+    public void testMaxResourceUsage() {
+        LocalBrokerData data = new LocalBrokerData();
+        data.setCpu(new ResourceUsage(1.0, 100.0));
+        data.setMemory(new ResourceUsage(800.0, 200.0));
+        data.setDirectMemory(new ResourceUsage(2.0, 100.0));
+        data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
+        data.setBandwidthOut(new ResourceUsage(4.0, 100.0));
+
+        double epsilon = 0.00001;
+        double weight = 0.5;
+        // skips memory usage
+        assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);
+
+        assertEquals(
+                data.getMaxResourceUsageWithWeight(
+                        weight, weight, weight, weight, weight), 2.0, epsilon);
+
+        assertEquals(
+                data.getMaxResourceUsageWithWeightWithinLimit(
+                        weight, weight, weight, weight, weight), 0.02, epsilon);
+
+    }
+}