You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/02/14 02:03:51 UTC
[pulsar] branch master updated: [improve][broker] PIP-192 Added namespace unload scheduler (#19477)
This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af1b6e16ad9 [improve][broker] PIP-192 Added namespace unload scheduler (#19477)
af1b6e16ad9 is described below
commit af1b6e16ad9ffc0f5fad532e71c25e3a33e389c5
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Feb 14 10:03:40 2023 +0800
[improve][broker] PIP-192 Added namespace unload scheduler (#19477)
---
.../extensions/ExtensibleLoadManagerImpl.java | 9 +-
.../extensions/scheduler/UnloadScheduler.java | 180 +++++++++++++++++++++
.../extensions/ExtensibleLoadManagerImplTest.java | 3 +
.../extensions/scheduler/UnloadSchedulerTest.java | 171 ++++++++++++++++++++
4 files changed, 362 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index fe28d67227e..59c66746761 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -49,6 +49,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -86,6 +88,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
+ private LoadManagerScheduler unloadScheduler;
+
@Getter
private LoadManagerContext context;
@@ -194,7 +198,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
interval,
interval, TimeUnit.MILLISECONDS);
- // TODO: Start unload scheduler and bundle split scheduler
+ // TODO: Start bundle split scheduler.
+ this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel);
+ this.unloadScheduler.start();
this.started = true;
}
@@ -319,6 +325,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
+ this.unloadScheduler.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
new file mode 100644
index 00000000000..5cdbd302710
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Reflections;
+
+@Slf4j
+public class UnloadScheduler implements LoadManagerScheduler {
+
+ private final NamespaceUnloadStrategy namespaceUnloadStrategy;
+
+ private final ScheduledExecutorService loadManagerExecutor;
+
+ private final LoadManagerContext context;
+
+ private final ServiceUnitStateChannel channel;
+
+ private final ServiceConfiguration conf;
+
+ private volatile ScheduledFuture<?> task;
+
+ private final Map<String, Long> recentlyUnloadedBundles;
+
+ private final Map<String, Long> recentlyUnloadedBrokers;
+
+ private volatile CompletableFuture<Void> currentRunningFuture = null;
+
+ public UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+ LoadManagerContext context,
+ ServiceUnitStateChannel channel) {
+ this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration()));
+ }
+
+ @VisibleForTesting
+ protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+ LoadManagerContext context,
+ ServiceUnitStateChannel channel,
+ NamespaceUnloadStrategy strategy) {
+ this.namespaceUnloadStrategy = strategy;
+ this.recentlyUnloadedBundles = new HashMap<>();
+ this.recentlyUnloadedBrokers = new HashMap<>();
+ this.loadManagerExecutor = loadManagerExecutor;
+ this.context = context;
+ this.conf = context.brokerConfiguration();
+ this.channel = channel;
+ }
+
+ @Override
+ public synchronized void execute() {
+ boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+ if (debugMode) {
+ log.info("Load balancer enabled: {}, Shedding enabled: {}.",
+ conf.isLoadBalancerEnabled(), conf.isLoadBalancerSheddingEnabled());
+ }
+ if (!isLoadBalancerSheddingEnabled()) {
+ if (debugMode) {
+ log.info("The load balancer or load balancer shedding already disabled. Skipping.");
+ }
+ return;
+ }
+ if (currentRunningFuture != null && !currentRunningFuture.isDone()) {
+ if (debugMode) {
+ log.info("Auto namespace unload is running. Skipping.");
+ }
+ return;
+ }
+ // Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map.
+ final long timeout = System.currentTimeMillis()
+ - TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
+ recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);
+
+ this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> {
+ if (!isChannelOwner) {
+ if (debugMode) {
+ log.info("Current broker is not channel owner. Skipping.");
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> {
+ if (debugMode) {
+ log.info("Available brokers: {}", availableBrokers);
+ }
+ if (availableBrokers.size() <= 1) {
+ log.info("Only 1 broker available: no load shedding will be performed. Skipping.");
+ return CompletableFuture.completedFuture(null);
+ }
+ final UnloadDecision unloadDecision = namespaceUnloadStrategy
+ .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers);
+ if (debugMode) {
+ log.info("[{}] Unload decision result: {}",
+ namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString());
+ }
+ if (unloadDecision.getUnloads().isEmpty()) {
+ if (debugMode) {
+ log.info("[{}] Unload decision unloads is empty. Skipping.",
+ namespaceUnloadStrategy.getClass().getSimpleName());
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ unloadDecision.getUnloads().forEach((broker, unload) -> {
+ log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload);
+ futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
+ recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
+ recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());
+ }));
+ });
+ return FutureUtil.waitForAll(futures).exceptionally(ex -> {
+ log.error("[{}] Namespace unload has exception.",
+ namespaceUnloadStrategy.getClass().getSimpleName(), ex);
+ return null;
+ });
+ });
+ });
+ }
+
+ @Override
+ public void start() {
+ long loadSheddingInterval = TimeUnit.MINUTES
+ .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
+ this.task = loadManagerExecutor.scheduleAtFixedRate(
+ this::execute, loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ if (this.task != null) {
+ this.task.cancel(false);
+ }
+ this.recentlyUnloadedBundles.clear();
+ this.recentlyUnloadedBrokers.clear();
+ }
+
+ private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) {
+ try {
+ return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class,
+ Thread.currentThread().getContextClassLoader());
+ } catch (Exception e) {
+ log.error("Error when trying to create namespace unload strategy: {}",
+ conf.getLoadBalancerLoadPlacementStrategy(), e);
+ }
+ log.error("create namespace unload strategy failed. using TransferShedder instead.");
+ return new TransferShedder();
+ }
+
+ private boolean isLoadBalancerSheddingEnabled() {
+ return conf.isLoadBalancerEnabled() && conf.isLoadBalancerSheddingEnabled();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 81b41aa1687..1ef4f660e4a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -99,6 +99,7 @@ import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -125,6 +126,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
private ServiceUnitStateChannelImpl channel2;
@BeforeClass
+ @Override
public void setup() throws Exception {
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
@@ -186,6 +188,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
}
@Override
+ @AfterClass
protected void cleanup() throws Exception {
pulsar1 = null;
pulsar2.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
new file mode 100644
index 00000000000..cda5f81d81b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker")
+public class UnloadSchedulerTest {
+
+ private ScheduledExecutorService loadManagerExecutor;
+
+ public LoadManagerContext setupContext(){
+ var ctx = getContext();
+ ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+ return ctx;
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ this.loadManagerExecutor = Executors
+ .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ this.loadManagerExecutor.shutdown();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testExecuteSuccess() {
+ LoadManagerContext context = setupContext();
+ BrokerRegistry registry = context.brokerRegistry();
+ ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+ doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+ doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2")))
+ .when(registry).getAvailableBrokersAsync();
+ doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any());
+ UnloadDecision decision = new UnloadDecision();
+ Unload unload = new Unload("broker-1", "bundle-1");
+ decision.getUnloads().put("broker-1", unload);
+ doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
+
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+ scheduler.execute();
+
+ verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
+
+ // Test empty unload.
+ UnloadDecision emptyUnload = new UnloadDecision();
+ doReturn(emptyUnload).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
+
+ scheduler.execute();
+
+ verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException {
+ LoadManagerContext context = setupContext();
+ BrokerRegistry registry = context.brokerRegistry();
+ ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+ doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+ doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
+ try {
+ // Delay 5 seconds to finish.
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Lists.newArrayList("broker-1", "broker-2");
+ }, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ executorService.execute(() -> {
+ scheduler.execute();
+ latch.countDown();
+ });
+ }
+ latch.await();
+
+ verify(registry, times(1)).getAvailableBrokersAsync();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testDisableLoadBalancer() {
+ LoadManagerContext context = setupContext();
+ context.brokerConfiguration().setLoadBalancerEnabled(false);
+ ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+ scheduler.execute();
+
+ verify(channel, times(0)).isChannelOwnerAsync();
+
+ context.brokerConfiguration().setLoadBalancerEnabled(true);
+ context.brokerConfiguration().setLoadBalancerSheddingEnabled(false);
+ scheduler.execute();
+
+ verify(channel, times(0)).isChannelOwnerAsync();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testNotChannelOwner() {
+ LoadManagerContext context = setupContext();
+ context.brokerConfiguration().setLoadBalancerEnabled(false);
+ ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+ NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+ UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+ doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync();
+
+ scheduler.execute();
+
+ verify(context.brokerRegistry(), times(0)).getAvailableBrokersAsync();
+ }
+
+ public LoadManagerContext getContext(){
+ var ctx = mock(LoadManagerContext.class);
+ var registry = mock(BrokerRegistry.class);
+ var conf = new ServiceConfiguration();
+ doReturn(conf).when(ctx).brokerConfiguration();
+ doReturn(registry).when(ctx).brokerRegistry();
+ return ctx;
+ }
+}