You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/18 11:39:19 UTC
[pulsar] branch branch-2.11 updated: [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new d8c32ca61af [fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
d8c32ca61af is described below
commit d8c32ca61af14caadd200a94c0736b07e007d6d7
Author: LinChen <15...@qq.com>
AuthorDate: Thu Aug 18 10:09:45 2022 +0800
[fix][broker] Support loadBalancerSheddingIntervalMinutes dynamic configuration (#16408)
---
.../org/apache/pulsar/broker/PulsarService.java | 13 ++++-----
.../broker/loadbalance/LoadSheddingTask.java | 34 +++++++++++++++++++++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 13 +++++++++
.../loadbalance/SimpleLoadManagerImplTest.java | 2 +-
4 files changed, 52 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6e9a9442a6c..64cfefb27b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -213,7 +213,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
- private ScheduledFuture<?> loadSheddingTask = null;
+ private LoadSheddingTask loadSheddingTask = null;
private ScheduledFuture<?> loadResourceQuotaTask = null;
private final AtomicReference<LoadManager> loadManager = new AtomicReference<>();
private PulsarAdmin adminClient = null;
@@ -1064,20 +1064,17 @@ public class PulsarService implements AutoCloseable, ShutdownService {
if (state == LeaderElectionState.Leading) {
LOG.info("This broker was elected leader");
if (getConfiguration().isLoadBalancerEnabled()) {
- long loadSheddingInterval = TimeUnit.MINUTES
- .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes());
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
+ loadSheddingTask.cancel();
}
if (loadResourceQuotaTask != null) {
loadResourceQuotaTask.cancel(false);
}
- loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate(
- new LoadSheddingTask(loadManager),
- loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+ loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor, config);
+ loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS);
@@ -1088,7 +1085,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
leaderElectionService.getCurrentLeader());
}
if (loadSheddingTask != null) {
- loadSheddingTask.cancel(false);
+ loadSheddingTask.cancel();
loadSheddingTask = null;
}
if (loadResourceQuotaTask != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
index 6b053117042..73139ac4989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
@@ -18,7 +18,10 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,9 +31,18 @@ import org.slf4j.LoggerFactory;
public class LoadSheddingTask implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LoadSheddingTask.class);
private final AtomicReference<LoadManager> loadManager;
+ private final ScheduledExecutorService loadManagerExecutor;
- public LoadSheddingTask(AtomicReference<LoadManager> loadManager) {
+ private final ServiceConfiguration config;
+
+ private volatile boolean isCancel = false;
+
+ public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
+ ScheduledExecutorService loadManagerExecutor,
+ ServiceConfiguration config) {
this.loadManager = loadManager;
+ this.loadManagerExecutor = loadManagerExecutor;
+ this.config = config;
}
@Override
@@ -39,6 +51,26 @@ public class LoadSheddingTask implements Runnable {
loadManager.get().doLoadShedding();
} catch (Exception e) {
LOG.warn("Error during the load shedding", e);
+ } finally {
+ if (!isCancel && loadManagerExecutor != null && config != null) {
+ loadManagerExecutor.schedule(
+ new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+ config.getLoadBalancerSheddingIntervalMinutes(),
+ TimeUnit.MINUTES);
+ }
}
}
+
+ public void start() {
+ if (loadManagerExecutor != null && config != null) {
+ loadManagerExecutor.schedule(
+ new LoadSheddingTask(loadManager, loadManagerExecutor, config),
+ config.getLoadBalancerSheddingIntervalMinutes(),
+ TimeUnit.MINUTES);
+ }
+ }
+
+ public void cancel() {
+ isCancel = true;
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ece5f8d8302..2ec0d76551a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -507,6 +507,19 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}
+ public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
+ // update configuration
+ admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");
+
+ // wait config to be updated
+ Awaitility.await().until(() -> {
+ return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
+ });
+
+ // verify value is updated
+ assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
+ }
+
@Test
public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
// update configuration
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index ba8035d9307..85025c5d2e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -455,7 +455,7 @@ public class SimpleLoadManagerImplTest {
task1.run();
verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
- LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager);
+ LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, null);
task2.run();
verify(loadManager, times(1)).doLoadShedding();
}