You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/02/14 02:03:51 UTC

[pulsar] branch master updated: [improve][broker] PIP-192 Added namespace unload scheduler (#19477)

This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af1b6e16ad9 [improve][broker] PIP-192 Added namespace unload scheduler (#19477)
af1b6e16ad9 is described below

commit af1b6e16ad9ffc0f5fad532e71c25e3a33e389c5
Author: Kai Wang <kw...@apache.org>
AuthorDate: Tue Feb 14 10:03:40 2023 +0800

    [improve][broker] PIP-192 Added namespace unload scheduler (#19477)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |   9 +-
 .../extensions/scheduler/UnloadScheduler.java      | 180 +++++++++++++++++++++
 .../extensions/ExtensibleLoadManagerImplTest.java  |   3 +
 .../extensions/scheduler/UnloadSchedulerTest.java  | 171 ++++++++++++++++++++
 4 files changed, 362 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index fe28d67227e..59c66746761 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -49,6 +49,8 @@ import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
 import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -86,6 +88,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
     private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
     private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
 
+    private LoadManagerScheduler unloadScheduler;
+
     @Getter
     private LoadManagerContext context;
 
@@ -194,7 +198,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
                         interval,
                         interval, TimeUnit.MILLISECONDS);
 
-        // TODO: Start unload scheduler and bundle split scheduler
+        // TODO: Start bundle split scheduler.
+        this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel);
+        this.unloadScheduler.start();
         this.started = true;
     }
 
@@ -319,6 +325,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
 
             this.brokerLoadDataStore.close();
             this.topBundlesLoadDataStore.close();
+            this.unloadScheduler.close();
         } catch (IOException ex) {
             throw new PulsarServerException(ex);
         } finally {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
new file mode 100644
index 00000000000..5cdbd302710
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadScheduler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.Reflections;
+
+@Slf4j
+public class UnloadScheduler implements LoadManagerScheduler {
+
+    private final NamespaceUnloadStrategy namespaceUnloadStrategy;
+
+    private final ScheduledExecutorService loadManagerExecutor;
+
+    private final LoadManagerContext context;
+
+    private final ServiceUnitStateChannel channel;
+
+    private final ServiceConfiguration conf;
+
+    private volatile ScheduledFuture<?> task;
+
+    private final Map<String, Long> recentlyUnloadedBundles;
+
+    private final Map<String, Long> recentlyUnloadedBrokers;
+
+    private volatile CompletableFuture<Void> currentRunningFuture = null;
+
+    public UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+                           LoadManagerContext context,
+                           ServiceUnitStateChannel channel) {
+        this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration()));
+    }
+
+    @VisibleForTesting
+    protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
+                              LoadManagerContext context,
+                              ServiceUnitStateChannel channel,
+                              NamespaceUnloadStrategy strategy) {
+        this.namespaceUnloadStrategy = strategy;
+        this.recentlyUnloadedBundles = new HashMap<>();
+        this.recentlyUnloadedBrokers = new HashMap<>();
+        this.loadManagerExecutor = loadManagerExecutor;
+        this.context = context;
+        this.conf = context.brokerConfiguration();
+        this.channel = channel;
+    }
+
+    @Override
+    public synchronized void execute() {
+        boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+        if (debugMode) {
+            log.info("Load balancer enabled: {}, Shedding enabled: {}.",
+                    conf.isLoadBalancerEnabled(), conf.isLoadBalancerSheddingEnabled());
+        }
+        if (!isLoadBalancerSheddingEnabled()) {
+            if (debugMode) {
+                log.info("The load balancer or load balancer shedding already disabled. Skipping.");
+            }
+            return;
+        }
+        if (currentRunningFuture != null && !currentRunningFuture.isDone()) {
+            if (debugMode) {
+                log.info("Auto namespace unload is running. Skipping.");
+            }
+            return;
+        }
+        // Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map.
+        final long timeout = System.currentTimeMillis()
+                - TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
+        recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);
+
+        this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> {
+            if (!isChannelOwner) {
+                if (debugMode) {
+                    log.info("Current broker is not channel owner. Skipping.");
+                }
+                return CompletableFuture.completedFuture(null);
+            }
+            return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> {
+                if (debugMode) {
+                   log.info("Available brokers: {}", availableBrokers);
+                }
+                if (availableBrokers.size() <= 1) {
+                    log.info("Only 1 broker available: no load shedding will be performed. Skipping.");
+                    return CompletableFuture.completedFuture(null);
+                }
+                final UnloadDecision unloadDecision = namespaceUnloadStrategy
+                        .findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers);
+                if (debugMode) {
+                    log.info("[{}] Unload decision result: {}",
+                            namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString());
+                }
+                if (unloadDecision.getUnloads().isEmpty()) {
+                    if (debugMode) {
+                        log.info("[{}] Unload decision unloads is empty. Skipping.",
+                                namespaceUnloadStrategy.getClass().getSimpleName());
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                unloadDecision.getUnloads().forEach((broker, unload) -> {
+                    log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload);
+                    futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
+                        recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
+                        recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());
+                    }));
+                });
+                return FutureUtil.waitForAll(futures).exceptionally(ex -> {
+                    log.error("[{}] Namespace unload has exception.",
+                            namespaceUnloadStrategy.getClass().getSimpleName(), ex);
+                    return null;
+                });
+            });
+        });
+    }
+
+    @Override
+    public void start() {
+        long loadSheddingInterval = TimeUnit.MINUTES
+                .toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
+        this.task = loadManagerExecutor.scheduleAtFixedRate(
+                this::execute, loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() {
+        if (this.task != null) {
+            this.task.cancel(false);
+        }
+        this.recentlyUnloadedBundles.clear();
+        this.recentlyUnloadedBrokers.clear();
+    }
+
+    private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) {
+        try {
+            return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class,
+                    Thread.currentThread().getContextClassLoader());
+        } catch (Exception e) {
+            log.error("Error when trying to create namespace unload strategy: {}",
+                    conf.getLoadBalancerLoadPlacementStrategy(), e);
+        }
+        log.error("create namespace unload strategy failed. using TransferShedder instead.");
+        return new TransferShedder();
+    }
+
+    private boolean isLoadBalancerSheddingEnabled() {
+        return conf.isLoadBalancerEnabled() && conf.isLoadBalancerSheddingEnabled();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 81b41aa1687..1ef4f660e4a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -99,6 +99,7 @@ import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -125,6 +126,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
     private ServiceUnitStateChannelImpl channel2;
 
     @BeforeClass
+    @Override
     public void setup() throws Exception {
         conf.setAllowAutoTopicCreation(true);
         conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
@@ -186,6 +188,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
     }
 
     @Override
+    @AfterClass
     protected void cleanup() throws Exception {
         pulsar1 = null;
         pulsar2.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
new file mode 100644
index 00000000000..cda5f81d81b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/UnloadSchedulerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
+import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = "broker")
+public class UnloadSchedulerTest {
+
+   private ScheduledExecutorService loadManagerExecutor;
+
+    public LoadManagerContext setupContext(){
+        var ctx = getContext();
+        ctx.brokerConfiguration().setLoadBalancerDebugModeEnabled(true);
+        return ctx;
+    }
+
+    @BeforeMethod
+    public void setUp() {
+        this.loadManagerExecutor = Executors
+                .newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory("pulsar-load-manager"));
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        this.loadManagerExecutor.shutdown();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testExecuteSuccess() {
+        LoadManagerContext context = setupContext();
+        BrokerRegistry registry = context.brokerRegistry();
+        ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+        doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+        doReturn(CompletableFuture.completedFuture(Lists.newArrayList("broker-1", "broker-2")))
+                .when(registry).getAvailableBrokersAsync();
+        doReturn(CompletableFuture.completedFuture(null)).when(channel).publishUnloadEventAsync(any());
+        UnloadDecision decision = new UnloadDecision();
+        Unload unload = new Unload("broker-1", "bundle-1");
+        decision.getUnloads().put("broker-1", unload);
+        doReturn(decision).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
+
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+        scheduler.execute();
+
+        verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
+
+        // Test empty unload.
+        UnloadDecision emptyUnload = new UnloadDecision();
+        doReturn(emptyUnload).when(unloadStrategy).findBundlesForUnloading(any(), any(), any());
+
+        scheduler.execute();
+
+        verify(channel, times(1)).publishUnloadEventAsync(eq(unload));
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testExecuteMoreThenOnceWhenFirstNotDone() throws InterruptedException {
+        LoadManagerContext context = setupContext();
+        BrokerRegistry registry = context.brokerRegistry();
+        ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+        doReturn(CompletableFuture.completedFuture(true)).when(channel).isChannelOwnerAsync();
+        doAnswer(__ -> CompletableFuture.supplyAsync(() -> {
+                try {
+                    // Delay 5 seconds to finish.
+                    TimeUnit.SECONDS.sleep(5);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                return Lists.newArrayList("broker-1", "broker-2");
+            }, Executors.newFixedThreadPool(1))).when(registry).getAvailableBrokersAsync();
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(10);
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            executorService.execute(() -> {
+                scheduler.execute();
+                latch.countDown();
+            });
+        }
+        latch.await();
+
+        verify(registry, times(1)).getAvailableBrokersAsync();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testDisableLoadBalancer() {
+        LoadManagerContext context = setupContext();
+        context.brokerConfiguration().setLoadBalancerEnabled(false);
+        ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+
+        scheduler.execute();
+
+        verify(channel, times(0)).isChannelOwnerAsync();
+
+        context.brokerConfiguration().setLoadBalancerEnabled(true);
+        context.brokerConfiguration().setLoadBalancerSheddingEnabled(false);
+        scheduler.execute();
+
+        verify(channel, times(0)).isChannelOwnerAsync();
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testNotChannelOwner() {
+        LoadManagerContext context = setupContext();
+        context.brokerConfiguration().setLoadBalancerEnabled(false);
+        ServiceUnitStateChannel channel = mock(ServiceUnitStateChannel.class);
+        NamespaceUnloadStrategy unloadStrategy = mock(NamespaceUnloadStrategy.class);
+        UnloadScheduler scheduler = new UnloadScheduler(loadManagerExecutor, context, channel, unloadStrategy);
+        doReturn(CompletableFuture.completedFuture(false)).when(channel).isChannelOwnerAsync();
+
+        scheduler.execute();
+
+        verify(context.brokerRegistry(), times(0)).getAvailableBrokersAsync();
+    }
+
+    public LoadManagerContext getContext(){
+        var ctx = mock(LoadManagerContext.class);
+        var registry = mock(BrokerRegistry.class);
+        var conf = new ServiceConfiguration();
+        doReturn(conf).when(ctx).brokerConfiguration();
+        doReturn(registry).when(ctx).brokerRegistry();
+        return ctx;
+    }
+}