You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/02 03:37:33 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 18cd7f4e04e [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
18cd7f4e04e is described below
commit 18cd7f4e04efd041f8c249cdfe4e9d278a20ef39
Author: LinChen <15...@qq.com>
AuthorDate: Thu Aug 18 10:09:45 2022 +0800
[fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
(cherry picked from commit e4dcf5add5f26eb8c659d6e0a5d65e5a1a9de904)
---
.../org/apache/pulsar/broker/PulsarService.java | 13 ++++-----
.../broker/loadbalance/LoadSheddingTask.java | 34 +++++++++++++++++++++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 13 +++++++++
.../loadbalance/SimpleLoadManagerImplTest.java | 2 +-
4 files changed, 52 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a20af6e4f87..7dd5595b100 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -209,7 +209,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
- private ScheduledFuture<?> loadSheddingTask = null;
+ private LoadSheddingTask loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
private PulsarAdmin adminClient = null;
@@ -987,20 +987,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
- long loadSheddingInterval = TimeUnit.MINUTES
- .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
+ loadSheddingTask.cancel();
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
- loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
- new LoadSheddingTask(loadManager),
- loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+ loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
+ loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
@@ -1011,7 +1008,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
+ loadSheddingTask.cancel();
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
index 6b053117042..73139ac4989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,9 +31,18 @@ import org.slf4j.LoggerFactory;
public class LoadSheddingTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
private final AtomicReference<LoadManager> loadManager;
+ private final ScheduledExecutorService loadManagerExecutor;
- public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
+ private final ServiceConfiguration config;
+
+ private volatile boolean isCancel = false;
+
+ public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
+ ScheduledExecutorService loadManagerExecutor,
+ ServiceConfiguration config) {
this.loadManager = loadManager;
+ this.loadManagerExecutor = loadManagerExecutor;
+ this.config = config;
}
@Override
@@ -39,6 +51,26 @@ public class LoadSheddingTask implements Runnable {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
+ } finally {
+ if (!isCancel && loadManagerExecutor != null && config != null) {
+ loadManagerExecutor.schedule(
+ new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+ config.getLoadBalancerSheddingIntervalMinutes(),
+ TimeUnit.MINUTES);
+ }
}
}
+
+ public void start() {
+ if (loadManagerExecutor != null && config != null) {
+ loadManagerExecutor.schedule(
+ new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+ config.getLoadBalancerSheddingIntervalMinutes(),
+ TimeUnit.MINUTES);
+ }
+ }
+
+ public void cancel() {
+ isCancel = true;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 3c5d629d2c0..98400036f03 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -495,6 +495,19 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}
+ public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
+ // update configuration
+ admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");
+
+ // wait config to be updated
+ Awaitility.await().until(() -> {
+ return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
+ });
+
+ // verify value is updated
+ assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
+ }
+
/**
* <pre>
* Verifies: zk-update configuration updates service-config
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index e972a52788c..ceff0d8aaed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -453,7 +453,7 @@ public class SimpleLoadManagerImplTest {
task1.run();
verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
- LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
+ LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null);
task2.run();
verify(loadManager, times(1)).doLoadShedding();
}