You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/02 03:37:33 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 18cd7f4e04e [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
18cd7f4e04e is described below

commit 18cd7f4e04efd041f8c249cdfe4e9d278a20ef39
Author: LinChen <15...@qq.com>
AuthorDate: Thu Aug 18 10:09:45 2022 +0800

    [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
    
    (cherry picked from commit e4dcf5add5f26eb8c659d6e0a5d65e5a1a9de904)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 13 ++++-----
 .../broker/loadbalance/LoadSheddingTask.java       | 34 +++++++++++++++++++++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 13 +++++++++
 .../loadbalance/SimpleLoadManagerImplTest.java     |  2 +-
 4 files changed, 52 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a20af6e4f87..7dd5595b100 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -209,7 +209,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
-    private ScheduledFuture<?> loadSheddingTask = null;
+    private LoadSheddingTask loadSheddingTask = null;
     private ScheduledFuture<?> loadResourceQuotaTask = null;
     private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
     private PulsarAdmin adminClient = null;
@@ -987,20 +987,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                     if (state == LeaderElectionState.Leading) {
                         LOG.info("This broker was elected leader");
                         if (getConfiguration().isLoadBalancerEnabled()) {
-                            long loadSheddingInterval = TimeUnit.MINUTES
-                                    .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
                             long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                                     .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
 
                             if (loadSheddingTask != null) {
-                                loadSheddingTask.cancel(false);
+                                loadSheddingTask.cancel();
                             }
                             if (loadResourceQuotaTask != null) {
                                 loadResourceQuotaTask.cancel(false);
                             }
-                            loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
-                                    new LoadSheddingTask(loadManager),
-                                    loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+                            loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
+                            loadSheddingTask.start();
                             loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                                     new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
                                     resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
@@ -1011,7 +1008,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                                     leaderElectionService.getCurrentLeader());
                         }
                         if (loadSheddingTask != null) {
-                            loadSheddingTask.cancel(false);
+                            loadSheddingTask.cancel();
                             loadSheddingTask = null;
                         }
                         if (loadResourceQuotaTask != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
index 6b053117042..73139ac4989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,9 +31,18 @@ import org.slf4j.LoggerFactory;
 public class LoadSheddingTask implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
     private final AtomicReference<LoadManager> loadManager;
+    private final ScheduledExecutorService loadManagerExecutor;
 
-    public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
+    private final ServiceConfiguration config;
+
+    private volatile boolean isCancel = false;
+
+    public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
+                            ScheduledExecutorService loadManagerExecutor,
+                            ServiceConfiguration config) {
         this.loadManager = loadManager;
+        this.loadManagerExecutor = loadManagerExecutor;
+        this.config = config;
     }
 
     @Override
@@ -39,6 +51,26 @@ public class LoadSheddingTask implements Runnable {
             loadManager.get().doLoadShedding();
         } catch (Exception e) {
             LOG.warn("Error during the load shedding", e);
+        } finally {
+            if (!isCancel && loadManagerExecutor != null && config != null) {
+                loadManagerExecutor.schedule(
+                        new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+                        config.getLoadBalancerSheddingIntervalMinutes(),
+                        TimeUnit.MINUTES);
+            }
         }
     }
+
+    public void start() {
+        if (loadManagerExecutor != null && config != null) {
+            loadManagerExecutor.schedule(
+                    new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+                    config.getLoadBalancerSheddingIntervalMinutes(),
+                    TimeUnit.MINUTES);
+        }
+    }
+
+    public void cancel() {
+        isCancel = true;
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 3c5d629d2c0..98400036f03 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -495,6 +495,19 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
     }
 
+    public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
+        // update configuration
+        admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");
+
+        // wait config to be updated
+        Awaitility.await().until(() -> {
+            return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
+        });
+
+        // verify value is updated
+        assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
+    }
+
     /**
      * <pre>
      * Verifies: zk-update configuration updates service-config
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index e972a52788c..ceff0d8aaed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -453,7 +453,7 @@ public class SimpleLoadManagerImplTest {
         task1.run();
         verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
 
-        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
+        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null);
         task2.run();
         verify(loadManager, times(1)).doLoadShedding();
     }