You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by kw...@apache.org on 2023/03/14 01:39:16 UTC
[pulsar] branch master updated: [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9a85dea53a6 [improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
9a85dea53a6 is described below
commit 9a85dea53a6d0896cc4ba1ceb1ab11a47d5d65da
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Mon Mar 13 18:39:07 2023 -0700
[improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl (#19622)
Master Issue: https://github.com/apache/pulsar/issues/16691
### Motivation
We will start raising PRs to implement PIP-192, https://github.com/apache/pulsar/issues/16691
### Modifications
This PR implemented
- SplitScheduler
- DefaultNamespaceBundleSplitStrategyImpl
- SplitManager
- and their unit test.
---
.../apache/pulsar/broker/ServiceConfiguration.java | 24 ++
.../extensions/ExtensibleLoadManagerImpl.java | 22 +-
.../extensions/manager/SplitManager.java | 119 +++++++++
.../extensions/models/SplitCounter.java | 46 ++--
.../extensions/models/SplitDecision.java | 9 -
.../extensions/scheduler/SplitScheduler.java | 177 +++++++++++++
.../DefaultNamespaceBundleSplitStrategyImpl.java | 171 +++++++++++++
.../NamespaceBundleSplitStrategy.java | 7 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 31 +--
.../extensions/manager/SplitManagerTest.java | 222 ++++++++++++++++
.../extensions/scheduler/SplitSchedulerTest.java | 158 ++++++++++++
.../DefaultNamespaceBundleSplitStrategyTest.java | 284 +++++++++++++++++++++
12 files changed, 1208 insertions(+), 62 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ec5b0d4042b..3c00e905ac7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2557,6 +2557,30 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "(only used in load balancer extension logics)"
)
private double loadBalancerBundleLoadReportPercentage = 10;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Service units'(bundles) split interval. Broker periodically checks whether "
+ + "some service units(e.g. bundles) should split if they become hot-spots. "
+ + "(only used in load balancer extension logics)"
+ )
+ private int loadBalancerSplitIntervalMinutes = 1;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Max number of bundles to split to per cycle. "
+ + "(only used in load balancer extension logics)"
+ )
+ private int loadBalancerMaxNumberOfBundlesToSplitPerCycle = 10;
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ dynamic = true,
+ doc = "Threshold to the consecutive count of fulfilled split conditions. "
+ + "If the split scheduler consecutively finds bundles that meet split conditions "
+ + "many times bigger than this threshold, the scheduler will trigger splits on the bundles "
+ + "(if the number of bundles is less than loadBalancerNamespaceMaximumBundles). "
+ + "(only used in load balancer extension logics)"
+ )
+ private int loadBalancerNamespaceBundleSplitConditionThreshold = 5;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 82790f44fcb..c8ac8e46845 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -45,16 +45,17 @@ import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
-import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
+import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
@@ -103,7 +104,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
@Getter
private final List<BrokerFilter> brokerFilterPipeline;
-
/**
* The load data reporter.
*/
@@ -113,9 +113,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private ScheduledFuture brokerLoadDataReportTask;
private ScheduledFuture topBundlesLoadDataReportTask;
+ private SplitScheduler splitScheduler;
private UnloadManager unloadManager;
+ private SplitManager splitManager;
+
private boolean started = false;
private final AssignCounter assignCounter = new AssignCounter();
@@ -166,7 +169,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.unloadManager = new UnloadManager();
+ this.splitManager = new SplitManager(splitCounter);
this.serviceUnitStateChannel.listen(unloadManager);
+ this.serviceUnitStateChannel.listen(splitManager);
this.serviceUnitStateChannel.start();
try {
@@ -184,7 +189,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
.brokerLoadDataStore(brokerLoadDataStore)
.topBundleLoadDataStore(topBundlesLoadDataStore).build();
-
this.brokerLoadDataReporter =
new BrokerLoadDataReporter(pulsar, brokerRegistry.getBrokerId(), brokerLoadDataStore);
@@ -216,10 +220,12 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
interval,
interval, TimeUnit.MILLISECONDS);
- // TODO: Start bundle split scheduler.
this.unloadScheduler = new UnloadScheduler(
pulsar.getLoadManagerExecutor(), unloadManager, context, serviceUnitStateChannel);
this.unloadScheduler.start();
+ this.splitScheduler = new SplitScheduler(
+ pulsar, serviceUnitStateChannel, splitManager, splitCounter, splitMetrics, context);
+ this.splitScheduler.start();
this.started = true;
}
@@ -380,6 +386,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
this.unloadScheduler.close();
+ this.splitScheduler.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
@@ -411,13 +418,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
}
- private void updateSplitMetrics(List<SplitDecision> decisions) {
- for (var decision : decisions) {
- splitCounter.update(decision);
- }
- this.splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
- }
-
public List<Metrics> getMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
new file mode 100644
index 00000000000..71ebbc92a87
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+
+/**
+ * Split manager.
+ */
+@Slf4j
+public class SplitManager implements StateChangeListener {
+
+
+ private final Map<String, CompletableFuture<Void>> inFlightSplitRequests;
+
+ private final SplitCounter counter;
+
+ public SplitManager(SplitCounter splitCounter) {
+ this.inFlightSplitRequests = new ConcurrentHashMap<>();
+ this.counter = splitCounter;
+ }
+
+ private void complete(String serviceUnit, Throwable ex) {
+ inFlightSplitRequests.computeIfPresent(serviceUnit, (__, future) -> {
+ if (!future.isDone()) {
+ if (ex != null) {
+ future.completeExceptionally(ex);
+ } else {
+ future.complete(null);
+ }
+ }
+ return null;
+ });
+ }
+
+ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
+ String bundle,
+ SplitDecision decision,
+ long timeout,
+ TimeUnit timeoutUnit) {
+ return eventPubFuture
+ .thenCompose(__ -> inFlightSplitRequests.computeIfAbsent(bundle, ignore -> {
+ log.info("Published the bundle split event for bundle:{}. "
+ + "Waiting the split event to complete. Timeout: {} {}",
+ bundle, timeout, timeoutUnit);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.orTimeout(timeout, timeoutUnit).whenComplete((v, ex) -> {
+ if (ex != null) {
+ inFlightSplitRequests.remove(bundle);
+ log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex);
+ }
+ });
+ return future;
+ }))
+ .whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed the bundle split event for bundle:{}", bundle, ex);
+ counter.update(Failure, Unknown);
+ } else {
+ log.info("Completed the bundle split event for bundle:{}", bundle);
+ counter.update(decision);
+ }
+ });
+ }
+
+ @Override
+ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
+ ServiceUnitState state = ServiceUnitStateData.state(data);
+ if (t != null && inFlightSplitRequests.containsKey(serviceUnit)) {
+ this.complete(serviceUnit, t);
+ return;
+ }
+ switch (state) {
+ case Deleted, Owned, Init -> this.complete(serviceUnit, t);
+ default -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {}", data, serviceUnit);
+ }
+ }
+ }
+ }
+
+ public void close() {
+ inFlightSplitRequests.forEach((bundle, future) -> {
+ if (!future.isDone()) {
+ String msg = String.format("Splitting bundle: %s, but the manager already closed.", bundle);
+ log.warn(msg);
+ future.completeExceptionally(new IllegalStateException(msg));
+ }
+ });
+ inFlightSplitRequests.clear();
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
index 99406412cee..ed72b5f5863 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitCounter.java
@@ -19,10 +19,8 @@
package org.apache.pulsar.broker.loadbalance.extensions.models;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
@@ -32,7 +30,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.commons.lang3.mutable.MutableLong;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.common.stats.Metrics;
/**
@@ -40,23 +38,20 @@ import org.apache.pulsar.common.stats.Metrics;
*/
public class SplitCounter {
- long splitCount = 0;
-
- final Map<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> breakdownCounters;
+ private long splitCount = 0;
+ private final Map<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> breakdownCounters;
+ private volatile long updatedAt = 0;
public SplitCounter() {
breakdownCounters = Map.of(
Success, Map.of(
- Topics, new MutableLong(),
- Sessions, new MutableLong(),
- MsgRate, new MutableLong(),
- Bandwidth, new MutableLong(),
- Admin, new MutableLong()),
- Skip, Map.of(
- Balanced, new MutableLong()
- ),
+ Topics, new AtomicLong(),
+ Sessions, new AtomicLong(),
+ MsgRate, new AtomicLong(),
+ Bandwidth, new AtomicLong(),
+ Admin, new AtomicLong()),
Failure, Map.of(
- Unknown, new MutableLong())
+ Unknown, new AtomicLong())
);
}
@@ -64,7 +59,16 @@ public class SplitCounter {
if (decision.label == Success) {
splitCount++;
}
- breakdownCounters.get(decision.getLabel()).get(decision.getReason()).increment();
+ breakdownCounters.get(decision.getLabel()).get(decision.getReason()).incrementAndGet();
+ updatedAt = System.currentTimeMillis();
+ }
+
+ public void update(SplitDecision.Label label, SplitDecision.Reason reason) {
+ if (label == Success) {
+ splitCount++;
+ }
+ breakdownCounters.get(label).get(reason).incrementAndGet();
+ updatedAt = System.currentTimeMillis();
}
public List<Metrics> toMetrics(String advertisedBrokerAddress) {
@@ -77,17 +81,18 @@ public class SplitCounter {
m.put("brk_lb_bundles_split_total", splitCount);
metrics.add(m);
- for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, MutableLong>> etr
+
+ for (Map.Entry<SplitDecision.Label, Map<SplitDecision.Reason, AtomicLong>> etr
: breakdownCounters.entrySet()) {
var result = etr.getKey();
- for (Map.Entry<SplitDecision.Reason, MutableLong> counter : etr.getValue().entrySet()) {
+ for (Map.Entry<SplitDecision.Reason, AtomicLong> counter : etr.getValue().entrySet()) {
var reason = counter.getKey();
var count = counter.getValue();
Map<String, String> breakdownDims = new HashMap<>(dimensions);
breakdownDims.put("result", result.toString());
breakdownDims.put("reason", reason.toString());
Metrics breakdownMetric = Metrics.create(breakdownDims);
- breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count);
+ breakdownMetric.put("brk_lb_bundles_split_breakdown_total", count.get());
metrics.add(breakdownMetric);
}
}
@@ -95,4 +100,7 @@ public class SplitCounter {
return metrics;
}
+ public long updatedAt() {
+ return updatedAt;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
index a3dede50c1c..433d21a5a61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/SplitDecision.java
@@ -19,9 +19,7 @@
package org.apache.pulsar.broker.loadbalance.extensions.models;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Skip;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
-import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Balanced;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
import lombok.Data;
@@ -36,7 +34,6 @@ public class SplitDecision {
public enum Label {
Success,
- Skip,
Failure
}
@@ -46,7 +43,6 @@ public class SplitDecision {
MsgRate,
Bandwidth,
Admin,
- Balanced,
Unknown
}
@@ -62,11 +58,6 @@ public class SplitDecision {
reason = null;
}
- public void skip() {
- label = Skip;
- reason = Balanced;
- }
-
public void succeed(Reason reason) {
label = Success;
this.reason = reason;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
new file mode 100644
index 00000000000..589df80fc5c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitScheduler.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.DefaultNamespaceBundleSplitStrategyImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.common.util.FutureUtil;
+
+/**
+ * Service Unit(e.g. bundles) Split scheduler.
+ */
+@Slf4j
+public class SplitScheduler implements LoadManagerScheduler {
+
+ private final PulsarService pulsar;
+
+ private final ScheduledExecutorService loadManagerExecutor;
+
+ private final LoadManagerContext context;
+
+ private final ServiceConfiguration conf;
+
+ private final ServiceUnitStateChannel serviceUnitStateChannel;
+
+ private final NamespaceBundleSplitStrategy bundleSplitStrategy;
+
+ private final SplitCounter counter;
+
+ private final SplitManager splitManager;
+
+ private final AtomicReference<List<Metrics>> splitMetrics;
+
+ private volatile ScheduledFuture<?> task;
+
+ private long counterLastUpdatedAt = 0;
+
+ public SplitScheduler(PulsarService pulsar,
+ ServiceUnitStateChannel serviceUnitStateChannel,
+ SplitManager splitManager,
+ SplitCounter counter,
+ AtomicReference<List<Metrics>> splitMetrics,
+ LoadManagerContext context,
+ NamespaceBundleSplitStrategy bundleSplitStrategy) {
+ this.pulsar = pulsar;
+ this.loadManagerExecutor = pulsar.getLoadManagerExecutor();
+ this.splitManager = splitManager;
+ this.counter = counter;
+ this.splitMetrics = splitMetrics;
+ this.context = context;
+ this.conf = pulsar.getConfiguration();
+ this.bundleSplitStrategy = bundleSplitStrategy;
+ this.serviceUnitStateChannel = serviceUnitStateChannel;
+ }
+
+ public SplitScheduler(PulsarService pulsar,
+ ServiceUnitStateChannel serviceUnitStateChannel,
+ SplitManager splitManager,
+ SplitCounter counter,
+ AtomicReference<List<Metrics>> splitMetrics,
+ LoadManagerContext context) {
+ this(pulsar, serviceUnitStateChannel, splitManager, counter, splitMetrics, context,
+ new DefaultNamespaceBundleSplitStrategyImpl(counter));
+ }
+
+ @Override
+ public void execute() {
+ boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
+ if (debugMode) {
+ log.info("Load balancer enabled: {}, Split enabled: {}.",
+ conf.isLoadBalancerEnabled(), conf.isLoadBalancerAutoBundleSplitEnabled());
+ }
+
+ if (!isLoadBalancerAutoBundleSplitEnabled()) {
+ if (debugMode) {
+ log.info("The load balancer or load balancer split already disabled. Skipping.");
+ }
+ return;
+ }
+
+ synchronized (bundleSplitStrategy) {
+ final Set<SplitDecision> decisions = bundleSplitStrategy.findBundlesToSplit(context, pulsar);
+ if (!decisions.isEmpty()) {
+
+ // currently following the unloading timeout
+ var asyncOpTimeoutMs = conf.getNamespaceBundleUnloadingTimeoutMs();
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (SplitDecision decision : decisions) {
+ if (decision.getLabel() == Success) {
+ var split = decision.getSplit();
+ futures.add(
+ splitManager.waitAsync(
+ serviceUnitStateChannel.publishSplitEventAsync(split),
+ split.serviceUnit(),
+ decision,
+ asyncOpTimeoutMs, TimeUnit.MILLISECONDS)
+ );
+ }
+ }
+ try {
+ FutureUtil.waitForAll(futures)
+ .get(asyncOpTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (Throwable e) {
+ log.error("Failed to wait for split events to persist.", e);
+ }
+ } else {
+ if (debugMode) {
+ log.info("BundleSplitStrategy returned no bundles to split.");
+ }
+ }
+ }
+
+ if (counter.updatedAt() > counterLastUpdatedAt) {
+ splitMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress()));
+ counterLastUpdatedAt = counter.updatedAt();
+ }
+ }
+
+ @Override
+ public void start() {
+ long interval = TimeUnit.MINUTES
+ .toMillis(conf.getLoadBalancerSplitIntervalMinutes());
+ task = loadManagerExecutor.scheduleAtFixedRate(() -> {
+ try {
+ execute();
+ } catch (Throwable e) {
+ log.error("Failed to run the split job.", e);
+ }
+ }, interval, interval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ if (task != null) {
+ task.cancel(false);
+ task = null;
+ }
+ }
+
+ private boolean isLoadBalancerAutoBundleSplitEnabled() {
+ return conf.isLoadBalancerEnabled() && conf.isLoadBalancerAutoBundleSplitEnabled();
+ }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
new file mode 100644
index 00000000000..e572fd4161b
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Failure;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Bandwidth;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.MsgRate;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Topics;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+
+/**
+ * Determines which bundles should be split based on various thresholds.
+ *
+ * Migrate from {@link org.apache.pulsar.broker.loadbalance.impl.BundleSplitterTask}
+ */
+@Slf4j
+public class DefaultNamespaceBundleSplitStrategyImpl implements NamespaceBundleSplitStrategy {
+ private final Set<SplitDecision> decisionCache;
+ private final Map<String, Integer> namespaceBundleCount;
+ private final Map<String, Integer> bundleHighTrafficFrequency;
+ private final SplitCounter counter;
+
+ public DefaultNamespaceBundleSplitStrategyImpl(SplitCounter counter) {
+ decisionCache = new HashSet<>();
+ namespaceBundleCount = new HashMap<>();
+ bundleHighTrafficFrequency = new HashMap<>();
+ this.counter = counter;
+
+ }
+
+ @Override
+ public Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar) {
+ decisionCache.clear();
+ namespaceBundleCount.clear();
+ final ServiceConfiguration conf = pulsar.getConfiguration();
+ int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
+ long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
+ long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
+ long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
+ long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
+ long maxSplitCount = conf.getLoadBalancerMaxNumberOfBundlesToSplitPerCycle();
+ long splitConditionThreshold = conf.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+ boolean debug = log.isDebugEnabled() || conf.isLoadBalancerDebugModeEnabled();
+
+ Map<String, NamespaceBundleStats> bundleStatsMap = pulsar.getBrokerService().getBundleStats();
+ NamespaceBundleFactory namespaceBundleFactory =
+ pulsar.getNamespaceService().getNamespaceBundleFactory();
+
+ // clean bundleHighTrafficFrequency
+ bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet());
+
+ for (var entry : bundleStatsMap.entrySet()) {
+ final String bundle = entry.getKey();
+ final NamespaceBundleStats stats = entry.getValue();
+ if (stats.topics < 2) {
+ if (debug) {
+ log.info("The count of topics on the bundle {} is less than 2, skip split!", bundle);
+ }
+ continue;
+ }
+
+ final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if (!namespaceBundleFactory
+ .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
+ if (debug) {
+ log.info("Can't split the bundle:{}. invalid bundle range:{}. ", bundle, bundleRange);
+ }
+ counter.update(Failure, Unknown);
+ continue;
+ }
+
+ double totalMessageRate = stats.msgRateIn + stats.msgRateOut;
+ double totalMessageThroughput = stats.msgThroughputIn + stats.msgThroughputOut;
+ int totalSessionCount = stats.consumerCount + stats.producerCount;
+ SplitDecision.Reason reason = Unknown;
+ if (stats.topics > maxBundleTopics) {
+ reason = Topics;
+ } else if (maxBundleSessions > 0 && (totalSessionCount > maxBundleSessions)) {
+ reason = Sessions;
+ } else if (totalMessageRate > maxBundleMsgRate) {
+ reason = MsgRate;
+ } else if (totalMessageThroughput > maxBundleBandwidth) {
+ reason = Bandwidth;
+ }
+
+ if (reason != Unknown) {
+ bundleHighTrafficFrequency.put(bundle, bundleHighTrafficFrequency.getOrDefault(bundle, 0) + 1);
+ } else {
+ bundleHighTrafficFrequency.remove(bundle);
+ }
+
+ if (bundleHighTrafficFrequency.getOrDefault(bundle, 0) > splitConditionThreshold) {
+ final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ try {
+ final int bundleCount = pulsar.getNamespaceService()
+ .getBundleCount(NamespaceName.get(namespace));
+ if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0))
+ < maxBundleCount) {
+ if (debug) {
+ log.info("The bundle {} is considered to split. Topics: {}/{}, Sessions: ({}+{})/{}, "
+ + "Message Rate: {}/{} (msgs/s), Message Throughput: {}/{} (MB/s)",
+ bundle, stats.topics, maxBundleTopics, stats.producerCount, stats.consumerCount,
+ maxBundleSessions, totalMessageRate, maxBundleMsgRate,
+ totalMessageThroughput / LoadManagerShared.MIBI,
+ maxBundleBandwidth / LoadManagerShared.MIBI);
+ }
+ var decision = new SplitDecision();
+ decision.setSplit(new Split(bundle, context.brokerRegistry().getBrokerId(), new HashMap<>()));
+ decision.succeed(reason);
+ decisionCache.add(decision);
+ int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0);
+ namespaceBundleCount.put(namespace, bundleNum + 1);
+ bundleHighTrafficFrequency.remove(bundle);
+ // Clear namespace bundle-cache
+ namespaceBundleFactory.invalidateBundleCache(NamespaceName.get(namespaceName));
+ if (decisionCache.size() == maxSplitCount) {
+ if (debug) {
+ log.info("Too many bundles to split in this split cycle {} / {}. Stop.",
+ decisionCache.size(), maxSplitCount);
+ }
+ break;
+ }
+ } else {
+ if (debug) {
+ log.info(
+ "Could not split namespace bundle {} because namespace {} has too many bundles:"
+ + "{}", bundle, namespace, bundleCount);
+ }
+ }
+ } catch (Exception e) {
+ counter.update(Failure, Unknown);
+ log.warn("Error while computing bundle splits for namespace {}", namespace, e);
+ }
+ }
+ }
+ return decisionCache;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
similarity index 86%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
index 88bd7f0b087..14023f1b5b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/NamespaceBundleSplitStrategy.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/NamespaceBundleSplitStrategy.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
/**
* Determines which bundles should be split based on various thresholds.
@@ -35,5 +36,5 @@ public interface NamespaceBundleSplitStrategy {
* @param context The context used for decisions.
* @return A set of the bundles that should be split.
*/
- Set<Split> findBundlesToSplit(LoadManagerContext context);
+ Set<SplitDecision> findBundlesToSplit(LoadManagerContext context, PulsarService pulsar);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 441415a9d35..17fcb995261 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -464,24 +464,16 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
FieldUtils.readDeclaredField(primaryLoadManager, "splitMetrics", true);
SplitCounter splitCounter = new SplitCounter();
FieldUtils.writeDeclaredField(splitCounter, "splitCount", 35l, true);
- FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", new LinkedHashMap<>() {
- {
- put(SplitDecision.Label.Success, new LinkedHashMap<>() {
- {
- put(Topics, new MutableLong(1));
- put(Sessions, new MutableLong(2));
- put(MsgRate, new MutableLong(3));
- put(Bandwidth, new MutableLong(4));
- put(Admin, new MutableLong(5));
- }
- });
- put(SplitDecision.Label.Skip, Map.of(
- SplitDecision.Reason.Balanced, new MutableLong(6)
- ));
- put(SplitDecision.Label.Failure, Map.of(
- SplitDecision.Reason.Unknown, new MutableLong(7)));
- }
- }, true);
+ FieldUtils.writeDeclaredField(splitCounter, "breakdownCounters", Map.of(
+ SplitDecision.Label.Success, Map.of(
+ Topics, new AtomicLong(1),
+ Sessions, new AtomicLong(2),
+ MsgRate, new AtomicLong(3),
+ Bandwidth, new AtomicLong(4),
+ Admin, new AtomicLong(5)),
+ SplitDecision.Label.Failure, Map.of(
+ SplitDecision.Reason.Unknown, new AtomicLong(6))
+ ), true);
splitMetrics.set(splitCounter.toMetrics(pulsar.getAdvertisedAddress()));
}
@@ -556,8 +548,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
dimensions=[{broker=localhost, metric=bundlesSplit, reason=MsgRate, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=3}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Bandwidth, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=4}]
dimensions=[{broker=localhost, metric=bundlesSplit, reason=Admin, result=Success}], metrics=[{brk_lb_bundles_split_breakdown_total=5}]
- dimensions=[{broker=localhost, metric=bundlesSplit, reason=Balanced, result=Skip}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
- dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=7}]
+ dimensions=[{broker=localhost, metric=bundlesSplit, reason=Unknown, result=Failure}], metrics=[{brk_lb_bundles_split_breakdown_total=6}]
dimensions=[{broker=localhost, metric=assign, result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
dimensions=[{broker=localhost, metric=assign, result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
dimensions=[{broker=localhost, metric=assign, result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
new file mode 100644
index 00000000000..3287306ab48
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.manager;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.VERSION_ID_INIT;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Sessions;
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class SplitManagerTest {
+
+ String bundle = "bundle-1";
+
+ String dstBroker = "broker-1";
+
+ @Test
+ public void testEventPubFutureHasException() {
+ var counter = new SplitCounter();
+ SplitManager manager = new SplitManager(counter);
+ var decision = new SplitDecision();
+ CompletableFuture<Void> future =
+ manager.waitAsync(FutureUtil.failedFuture(new Exception("test")),
+ bundle, decision, 10, TimeUnit.SECONDS);
+
+ assertTrue(future.isCompletedExceptionally());
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertEquals(ex.getCause().getMessage(), "test");
+ }
+ var counterExpected = new SplitCounter();
+ counterExpected.update(SplitDecision.Label.Failure, Unknown);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+ }
+
+ @Test
+ public void testTimeout() throws IllegalAccessException {
+ var counter = new SplitCounter();
+ SplitManager manager = new SplitManager(counter);
+ var decision = new SplitDecision();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 3, TimeUnit.SECONDS);
+ var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof TimeoutException);
+ }
+
+ assertEquals(inFlightUnloadRequests.size(), 0);
+ var counterExpected = new SplitCounter();
+ counterExpected.update(SplitDecision.Label.Failure, Unknown);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+ }
+
+ @Test
+ public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException {
+ var counter = new SplitCounter();
+ SplitManager manager = new SplitManager(counter);
+ var counterExpected = new SplitCounter();
+ var decision = new SplitDecision();
+ decision.succeed(Sessions);
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 5, TimeUnit.SECONDS);
+ var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Assigning, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Splitting, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Releasing, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
+ counterExpected.update(SplitDecision.Label.Success, Sessions);
+ assertEquals(inFlightUnloadRequests.size(), 0);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+
+ // Success with Init state.
+ future = manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 5, TimeUnit.SECONDS);
+ inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 0);
+ counterExpected.update(SplitDecision.Label.Success, Sessions);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+ future.get();
+
+ // Success with Owned state.
+ future = manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 5, TimeUnit.SECONDS);
+ inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
+ assertEquals(inFlightUnloadRequests.size(), 0);
+ counterExpected.update(SplitDecision.Label.Success, Sessions);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+ future.get();
+ }
+
+ @Test
+ public void testFailedStage() throws IllegalAccessException {
+ var counter = new SplitCounter();
+ SplitManager manager = new SplitManager(counter);
+ var decision = new SplitDecision();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 5, TimeUnit.SECONDS);
+ var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+
+ assertEquals(inFlightUnloadRequests.size(), 1);
+
+ manager.handleEvent(bundle,
+ new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT),
+ new IllegalStateException("Failed stage."));
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ assertEquals(ex.getCause().getMessage(), "Failed stage.");
+ }
+
+ assertEquals(inFlightUnloadRequests.size(), 0);
+ var counterExpected = new SplitCounter();
+ counterExpected.update(SplitDecision.Label.Failure, Unknown);
+ assertEquals(counter.toMetrics(null).toString(),
+ counterExpected.toMetrics(null).toString());
+ }
+
+ @Test
+ public void testClose() throws IllegalAccessException {
+ SplitManager manager = new SplitManager(new SplitCounter());
+ var decision = new SplitDecision();
+ CompletableFuture<Void> future =
+ manager.waitAsync(CompletableFuture.completedFuture(null),
+ bundle, decision, 5, TimeUnit.SECONDS);
+ var inFlightUnloadRequests = getinFlightUnloadRequests(manager);
+ assertEquals(inFlightUnloadRequests.size(), 1);
+ manager.close();
+ assertEquals(inFlightUnloadRequests.size(), 0);
+
+ try {
+ future.get();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex.getCause() instanceof IllegalStateException);
+ }
+ }
+
+ private Map<String, CompletableFuture<Void>> getinFlightUnloadRequests(SplitManager manager)
+ throws IllegalAccessException {
+ var inFlightUnloadRequest =
+ (Map<String, CompletableFuture<Void>>) FieldUtils.readField(manager, "inFlightSplitRequests", true);
+
+ return inFlightUnloadRequest;
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
new file mode 100644
index 00000000000..7988aa41336
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/SplitSchedulerTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.SplitManager;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.loadbalance.extensions.strategy.NamespaceBundleSplitStrategy;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.stats.Metrics;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SplitSchedulerTest {
+
+ PulsarService pulsar;
+ ServiceConfiguration config;
+ NamespaceBundleFactory namespaceBundleFactory;
+ LoadManagerContext context;
+ ServiceUnitStateChannel channel;
+ NamespaceBundleSplitStrategy strategy;
+ String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
+ String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+ String broker = "broker-1";
+ SplitDecision decision1;
+ SplitDecision decision2;
+
+ @BeforeMethod
+ public void setUp() {
+
+ config = new ServiceConfiguration();
+ config.setLoadBalancerDebugModeEnabled(true);
+
+ pulsar = mock(PulsarService.class);
+ namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+ context = mock(LoadManagerContext.class);
+ channel = mock(ServiceUnitStateChannel.class);
+ strategy = mock(NamespaceBundleSplitStrategy.class);
+
+ doReturn(config).when(pulsar).getConfiguration();
+ doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
+ doReturn(CompletableFuture.completedFuture(null)).when(channel).publishSplitEventAsync(any());
+
+ decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.MsgRate);
+
+ decision2 = new SplitDecision();
+ decision2.setSplit(new Split(bundle2, broker, new HashMap<>()));
+ decision2.succeed(SplitDecision.Reason.Sessions);
+ Set<SplitDecision> decisions = Set.of(decision1, decision2);
+ doReturn(decisions).when(strategy).findBundlesToSplit(any(), any());
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testExecuteSuccess() {
+ AtomicReference<List<Metrics>> reference = new AtomicReference();
+ SplitCounter counter = new SplitCounter();
+ SplitManager manager = mock(SplitManager.class);
+ SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy);
+ doAnswer((invocation)->{
+ var decision = invocation.getArgument(2, SplitDecision.class);
+ counter.update(decision);
+ return CompletableFuture.completedFuture(null);
+ }).when(manager).waitAsync(any(), any(), any(), anyLong(), any());
+ scheduler.execute();
+
+ var counterExpected = new SplitCounter();
+ counterExpected.update(decision1);
+ counterExpected.update(decision2);
+ verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit()));
+ verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit()));
+
+ assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+
+ // Test empty splits.
+ Set<SplitDecision> emptyUnload = Set.of();
+ doReturn(emptyUnload).when(strategy).findBundlesToSplit(any(), any());
+
+ scheduler.execute();
+ verify(channel, times(2)).publishSplitEventAsync(any());
+ assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testExecuteFailure() {
+ AtomicReference<List<Metrics>> reference = new AtomicReference();
+ SplitCounter counter = new SplitCounter();
+ SplitManager manager = new SplitManager(counter);
+ SplitScheduler scheduler = new SplitScheduler(pulsar, channel, manager, counter, reference, context, strategy);
+ doReturn(CompletableFuture.failedFuture(new RuntimeException())).when(channel).publishSplitEventAsync(any());
+
+ scheduler.execute();
+
+
+ var counterExpected = new SplitCounter();
+ counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown);
+ counterExpected.update(SplitDecision.Label.Failure, SplitDecision.Reason.Unknown);
+ verify(channel, times(1)).publishSplitEventAsync(eq(decision1.getSplit()));
+ verify(channel, times(1)).publishSplitEventAsync(eq(decision2.getSplit()));
+
+ assertEquals(reference.get().toString(), counterExpected.toMetrics(pulsar.getAdvertisedAddress()).toString());
+ }
+
+
+ @Test(timeOut = 30 * 1000)
+ public void testDisableLoadBalancer() {
+
+ config.setLoadBalancerEnabled(false);
+ SplitScheduler scheduler = new SplitScheduler(pulsar, channel, null, null, null, context, strategy);
+
+ scheduler.execute();
+
+ verify(strategy, times(0)).findBundlesToSplit(any(), any());
+
+ config.setLoadBalancerEnabled(true);
+ config.setLoadBalancerAutoBundleSplitEnabled(false);
+ scheduler.execute();
+
+ verify(strategy, times(0)).findBundlesToSplit(any(), any());
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
new file mode 100644
index 00000000000..71606bb85a3
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Unknown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
+import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarStats;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class DefaultNamespaceBundleSplitStrategyTest {
+
+ PulsarService pulsar;
+ BrokerService brokerService;
+ PulsarStats pulsarStats;
+ Map<String, NamespaceBundleStats> bundleStats;
+ ServiceConfiguration config;
+ NamespaceBundleFactory namespaceBundleFactory;
+ NamespaceService namespaceService;
+
+ LoadManagerContext loadManagerContext;
+
+ BrokerRegistry brokerRegistry;
+
+ String bundle1 = "tenant/namespace/0x00000000_0xFFFFFFFF";
+ String bundle2 = "tenant/namespace/0x00000000_0x0FFFFFFF";
+
+ String broker = "broker-1";
+
+ @BeforeMethod
+ void setup() {
+ config = new ServiceConfiguration();
+ config.setLoadBalancerDebugModeEnabled(true);
+ config.setLoadBalancerNamespaceMaximumBundles(100);
+ config.setLoadBalancerNamespaceBundleMaxTopics(100);
+ config.setLoadBalancerNamespaceBundleMaxSessions(100);
+ config.setLoadBalancerNamespaceBundleMaxMsgRate(100);
+ config.setLoadBalancerNamespaceBundleMaxBandwidthMbytes(100);
+ config.setLoadBalancerMaxNumberOfBundlesToSplitPerCycle(1);
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(3);
+
+ pulsar = mock(PulsarService.class);
+ brokerService = mock(BrokerService.class);
+ pulsarStats = mock(PulsarStats.class);
+ namespaceService = mock(NamespaceService.class);
+ namespaceBundleFactory = mock(NamespaceBundleFactory.class);
+ loadManagerContext = mock(LoadManagerContext.class);
+ brokerRegistry = mock(BrokerRegistry.class);
+
+
+
+ doReturn(brokerService).when(pulsar).getBrokerService();
+ doReturn(config).when(pulsar).getConfiguration();
+ doReturn(pulsarStats).when(brokerService).getPulsarStats();
+ doReturn(namespaceService).when(pulsar).getNamespaceService();
+ doReturn(namespaceBundleFactory).when(namespaceService).getNamespaceBundleFactory();
+ doReturn(true).when(namespaceBundleFactory).canSplitBundle(any());
+ doReturn(brokerRegistry).when(loadManagerContext).brokerRegistry();
+ doReturn(broker).when(brokerRegistry).getBrokerId();
+
+
+ bundleStats = new LinkedHashMap<>();
+ NamespaceBundleStats stats1 = new NamespaceBundleStats();
+ stats1.topics = 5;
+ bundleStats.put(bundle1, stats1);
+ NamespaceBundleStats stats2 = new NamespaceBundleStats();
+ stats2.topics = 5;
+ bundleStats.put(bundle2, stats2);
+ doReturn(bundleStats).when(brokerService).getBundleStats();
+ }
+
+ public void testNamespaceBundleSplitConditionThreshold() {
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ assertEquals(actual.size(), 1);
+ }
+
+
+ public void testNotEnoughTopics() {
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+ bundleStats.values().forEach(v -> v.topics = 1);
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ var expected = Set.of();
+ assertEquals(actual, expected);
+ }
+
+ public void testNamespaceMaximumBundles() throws Exception {
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+ doReturn(config.getLoadBalancerNamespaceMaximumBundles()).when(namespaceService).getBundleCount(any());
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ var expected = Set.of();
+ assertEquals(actual, expected);
+ }
+
+ public void testEmptyBundleStats() {
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(new SplitCounter());
+ bundleStats.clear();
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ var expected = Set.of();
+ assertEquals(actual, expected);
+ }
+
+ public void testError() throws Exception {
+ var counter = spy(new SplitCounter());
+ config.setLoadBalancerNamespaceBundleSplitConditionThreshold(0);
+ bundleStats.values().forEach(v -> v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() + 1);
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ doThrow(new RuntimeException()).when(namespaceService).getBundleCount(any());
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ var expected = Set.of();
+ assertEquals(actual, expected);
+ verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ }
+
+ public void testMaxMsgRate() {
+ var counter = spy(new SplitCounter());
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+ bundleStats.values().forEach(v -> {
+ v.msgRateOut = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1;
+ v.msgRateIn = config.getLoadBalancerNamespaceBundleMaxMsgRate() / 2 + 1;
+ });
+ for (int i = 0; i < threshold + 2; i++) {
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ if (i == threshold) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.MsgRate);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else if (i == threshold + 1) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.MsgRate);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else {
+ assertEquals(actual, Set.of());
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ }
+ }
+ }
+
+ public void testMaxTopics() {
+ var counter = spy(new SplitCounter());
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+ bundleStats.values().forEach(v -> v.topics = config.getLoadBalancerNamespaceBundleMaxTopics() + 1);
+ for (int i = 0; i < threshold + 2; i++) {
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ if (i == threshold) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Topics);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else if (i == threshold + 1) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Topics);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else {
+ assertEquals(actual, Set.of());
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ }
+ }
+ }
+
+ public void testMaxSessions() {
+ var counter = spy(new SplitCounter());
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+ bundleStats.values().forEach(v -> {
+ v.producerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1;
+ v.consumerCount = config.getLoadBalancerNamespaceBundleMaxSessions() / 2 + 1;
+ });
+ for (int i = 0; i < threshold + 2; i++) {
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ if (i == threshold) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Sessions);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else if (i == threshold + 1) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Sessions);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else {
+ assertEquals(actual, Set.of());
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ }
+ }
+ }
+
+ public void testMaxBandwidthMbytes() {
+ var counter = spy(new SplitCounter());
+ var strategy = new DefaultNamespaceBundleSplitStrategyImpl(counter);
+ int threshold = config.getLoadBalancerNamespaceBundleSplitConditionThreshold();
+ bundleStats.values().forEach(v -> {
+ v.msgThroughputOut = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1;
+ v.msgThroughputIn = config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * 1024 * 1024 / 2 + 1;
+ });
+ for (int i = 0; i < threshold + 2; i++) {
+ var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
+ if (i == threshold) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle1, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Bandwidth);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else if (i == threshold + 1) {
+ SplitDecision decision1 = new SplitDecision();
+ decision1.setSplit(new Split(bundle2, broker, new HashMap<>()));
+ decision1.succeed(SplitDecision.Reason.Bandwidth);
+
+ assertEquals(actual, Set.of(decision1));
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ } else {
+ assertEquals(actual, Set.of());
+ verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), eq(Unknown));
+ }
+ }
+ }
+
+}