You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/18 11:39:19 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new d8c32ca61af [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
d8c32ca61af is described below

commit d8c32ca61af14caadd200a94c0736b07e007d6d7
Author: LinChen <15...@qq.com>
AuthorDate: Thu Aug 18 10:09:45 2022 +0800

    [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 13 ++++-----
 .../broker/loadbalance/LoadSheddingTask.java       | 34 +++++++++++++++++++++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 13 +++++++++
 .../loadbalance/SimpleLoadManagerImplTest.java     |  2 +-
 4 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6e9a9442a6c..64cfefb27b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -213,7 +213,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private LedgerOffloaderStats offloaderStats;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
-    private ScheduledFuture<?> loadSheddingTask = null;
+    private LoadSheddingTask loadSheddingTask = null;
     private ScheduledFuture<?> loadResourceQuotaTask = null;
     private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
     private PulsarAdmin adminClient = null;
@@ -1064,20 +1064,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                     if (state == LeaderElectionState.Leading) {
                         LOG.info("This broker was elected leader");
                         if (getConfiguration().isLoadBalancerEnabled()) {
-                            long loadSheddingInterval = TimeUnit.MINUTES
-                                    .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                             long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                                     .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
 
                             if (loadSheddingTask != null) {
-                                loadSheddingTask.cancel(false);
+                                loadSheddingTask.cancel();
                             }
                             if (loadResourceQuotaTask != null) {
                                 loadResourceQuotaTask.cancel(false);
                             }
-                            loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
-                                    new LoadSheddingTask(loadManager),
-                                    loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+                            loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
+                            loadSheddingTask.start();
                             loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                                     new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
                                     resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
@@ -1088,7 +1085,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                                     leaderElectionService.getCurrentLeader());
                         }
                         if (loadSheddingTask != null) {
-                            loadSheddingTask.cancel(false);
+                            loadSheddingTask.cancel();
                             loadSheddingTask = null;
                         }
                         if (loadResourceQuotaTask != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
index 6b053117042..73139ac4989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,9 +31,18 @@ import org.slf4j.LoggerFactory;
 public class LoadSheddingTask implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
     private final AtomicReference<LoadManager> loadManager;
+    private final ScheduledExecutorService loadManagerExecutor;
 
-    public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
+    private final ServiceConfiguration config;
+
+    private volatile boolean isCancel = false;
+
+    public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
+                            ScheduledExecutorService loadManagerExecutor,
+                            ServiceConfiguration config) {
         this.loadManager = loadManager;
+        this.loadManagerExecutor = loadManagerExecutor;
+        this.config = config;
     }
 
     @Override
@@ -39,6 +51,26 @@ public class LoadSheddingTask implements Runnable {
             loadManager.get().doLoadShedding();
         } catch (Exception e) {
             LOG.warn("Error during the load shedding", e);
+        } finally {
+            if (!isCancel && loadManagerExecutor != null && config != null) {
+                loadManagerExecutor.schedule(
+                        new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+                        config.getLoadBalancerSheddingIntervalMinutes(),
+                        TimeUnit.MINUTES);
+            }
         }
     }
+
+    public void start() {
+        if (loadManagerExecutor != null && config != null) {
+            loadManagerExecutor.schedule(
+                    new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+                    config.getLoadBalancerSheddingIntervalMinutes(),
+                    TimeUnit.MINUTES);
+        }
+    }
+
+    public void cancel() {
+        isCancel = true;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ece5f8d8302..2ec0d76551a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -507,6 +507,19 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
     }
 
+    public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
+        // update configuration
+        admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");
+
+        // wait config to be updated
+        Awaitility.await().until(() -> {
+            return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
+        });
+
+        // verify value is updated
+        assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
+    }
+
     @Test
     public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
         // update configuration
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index ba8035d9307..85025c5d2e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -455,7 +455,7 @@ public class SimpleLoadManagerImplTest {
         task1.run();
         verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
 
-        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
+        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null);
         task2.run();
         verify(loadManager, times(1)).doLoadShedding();
     }