You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/04 04:01:55 UTC

(pulsar) branch branch-3.1 updated: [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)

This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 651908a9033 [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)
651908a9033 is described below

commit 651908a903301da9c07dc93300635cc28d8ee69f
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Apr 3 21:01:49 2024 -0700

    [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  8 +++---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +--
 .../extensions/ExtensibleLoadManagerImpl.java      |  4 ---
 .../extensions/scheduler/TransferShedder.java      | 22 +++++++++++-----
 .../pulsar/broker/namespace/NamespaceService.java  | 30 +++++++++++-----------
 .../pulsar/broker/web/PulsarWebResource.java       |  4 +--
 .../extensions/ExtensibleLoadManagerImplTest.java  |  1 -
 .../extensions/scheduler/TransferShedderTest.java  | 13 +++++-----
 8 files changed, 45 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 64f4ee02881..5410bacbe78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -379,7 +379,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     private void closeLeaderElectionService() throws Exception {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
             ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
         } else {
             if (this.leaderElectionService != null) {
@@ -1135,7 +1135,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     }
 
     protected void startLeaderElectionService() {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
             LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
             return;
         }
@@ -1250,7 +1250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         LOG.info("Starting load management service ...");
         this.loadManager.get().start();
 
-        if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
             LOG.info("Starting load balancer");
             if (this.loadReportTask == null) {
                 long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
@@ -1343,7 +1343,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
      * @return a reference of the current <code>LeaderElectionService</code> instance.
      */
     public LeaderElectionService getLeaderElectionService() {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
             return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
         } else {
             return this.leaderElectionService;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9478857032f..f4732cad380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -968,13 +968,13 @@ public abstract class NamespacesBase extends AdminResource {
                     return CompletableFuture.completedFuture(null);
                 })
                 .thenCompose(__ -> {
-                    if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+                    if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
                         return CompletableFuture.completedFuture(null);
                     }
                     return validateLeaderBrokerAsync();
                 })
                 .thenAccept(__ -> {
-                    if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+                    if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
                         return;
                     }
                     // For ExtensibleLoadManager, this operation will be ignored.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 6a0e677c662..26ee45b7444 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -263,10 +263,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
         this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
     }
 
-    public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
-        return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
-    }
-
     public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
         return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 3564b4e9e3b..7126ccb0341 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -362,7 +362,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
             final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
             boolean transfer = conf.isLoadBalancerTransferEnabled();
             if (stats.std() > targetStd
-                    || isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
+                    || isUnderLoaded(context, stats.peekMinBroker(), stats)
                     || isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
                 unloadConditionHitCount++;
             } else {
@@ -390,7 +390,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
                 UnloadDecision.Reason reason;
                 if (stats.std() > targetStd) {
                     reason = Overloaded;
-                } else if (isUnderLoaded(context, stats.peekMinBroker(), stats.avg)) {
+                } else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
                     reason = Underloaded;
                     if (debugMode) {
                         log.info(String.format("broker:%s is underloaded:%s although "
@@ -669,19 +669,27 @@ public class TransferShedder implements NamespaceUnloadStrategy {
     }
 
 
-    private boolean isUnderLoaded(LoadManagerContext context, String broker, double avgLoad) {
+    private boolean isUnderLoaded(LoadManagerContext context, String broker, LoadStats stats) {
         var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
         if (brokerLoadDataOptional.isEmpty()) {
             return false;
         }
         var brokerLoadData = brokerLoadDataOptional.get();
-        if (brokerLoadData.getMsgThroughputEMA() < 1) {
+
+        var underLoadedMultiplier =
+                Math.min(0.5, Math.max(0.0, context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0));
+
+        if (brokerLoadData.getWeightedMaxEMA() < stats.avg * underLoadedMultiplier) {
             return true;
         }
 
-        return brokerLoadData.getWeightedMaxEMA()
-                < avgLoad * Math.min(0.5, Math.max(0.0,
-                context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
+        var maxBrokerLoadDataOptional = context.brokerLoadDataStore().get(stats.peekMaxBroker());
+        if (maxBrokerLoadDataOptional.isEmpty()) {
+            return false;
+        }
+
+        return brokerLoadData.getMsgThroughputEMA()
+                < maxBrokerLoadDataOptional.get().getMsgThroughputEMA() * underLoadedMultiplier;
     }
 
     private boolean isOverLoaded(LoadManagerContext context, String broker, double avgLoad) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 61e045ed304..e04be25fe49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -195,7 +195,7 @@ public class NamespaceService implements AutoCloseable {
                                     pulsar.getBrokerId(), optResult.get(), topic);
                             return CompletableFuture.completedFuture(optResult);
                         }
-                        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+                        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                             return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
                         } else {
                             // TODO: Add unit tests cover it.
@@ -311,7 +311,7 @@ public class NamespaceService implements AutoCloseable {
                 return CompletableFuture.completedFuture(Optional.empty());
             }
             CompletableFuture<Optional<LookupResult>> future =
-                    ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+                    ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
                     ? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
                     findBrokerServiceUrl(bundle, options);
 
@@ -375,7 +375,7 @@ public class NamespaceService implements AutoCloseable {
             NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
             // v2 namespace will always use full bundle object
             final NamespaceEphemeralData otherData;
-            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                 ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
                 otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
             } else {
@@ -781,7 +781,7 @@ public class NamespaceService implements AutoCloseable {
                                                          long timeout,
                                                          TimeUnit timeoutUnit,
                                                          boolean closeWithoutWaitingClientDisconnect) {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return ExtensibleLoadManagerImpl.get(loadManager.get())
                     .unloadNamespaceBundleAsync(bundle, destinationBroker);
         }
@@ -803,7 +803,7 @@ public class NamespaceService implements AutoCloseable {
                .getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
                .thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
                .thenCompose(namespaceIsolationPolicies -> {
-                   if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+                   if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                        ExtensibleLoadManagerImpl extensibleLoadManager =
                                ExtensibleLoadManagerImpl.get(loadManager.get());
                        var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
@@ -883,7 +883,7 @@ public class NamespaceService implements AutoCloseable {
     public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
                                                      NamespaceBundleSplitAlgorithm splitAlgorithm,
                                                      List<Long> boundaries) {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return ExtensibleLoadManagerImpl.get(loadManager.get())
                     .splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
         }
@@ -1126,7 +1126,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public Set<NamespaceBundle> getOwnedServiceUnits() {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
             return extensibleLoadManager.getOwnedServiceUnits();
         }
@@ -1148,7 +1148,7 @@ public class NamespaceService implements AutoCloseable {
         }
 
         if (suName instanceof NamespaceBundle) {
-            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                 return loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
             }
             // TODO: Add unit tests cover it.
@@ -1176,7 +1176,7 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
         // TODO: Add unit tests cover it.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return getBundleAsync(topicName)
                     .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
         }
@@ -1191,7 +1191,7 @@ public class NamespaceService implements AutoCloseable {
 
     private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
         // TODO: Add unit tests cover it.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return getFullBundleAsync(fqnn)
                     .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
         }
@@ -1201,7 +1201,7 @@ public class NamespaceService implements AutoCloseable {
 
     private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
         // TODO: Add unit tests cover it.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return getBundleAsync(topic)
                     .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
         }
@@ -1210,7 +1210,7 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
         // TODO: Add unit tests cover it.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return getBundleAsync(topicName)
                     .thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
         }
@@ -1220,7 +1220,7 @@ public class NamespaceService implements AutoCloseable {
 
     public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
         CompletableFuture<Void> future;
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
             future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
         } else {
@@ -1528,7 +1528,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
             return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
                     .thenCompose(lookupData -> lookupData
@@ -1545,7 +1545,7 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
             return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
                     .thenApply(Optional::isPresent);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 0d01a2c5041..2f437962002 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -732,7 +732,7 @@ public abstract class PulsarWebResource {
                                             .host(webUrl.get().getHost())
                                             .port(webUrl.get().getPort())
                                             .replaceQueryParam("authoritative", newAuthoritative);
-                                    if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+                                    if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
                                         uriBuilder.replaceQueryParam("destinationBroker", null);
                                     }
                                     URI redirect = uriBuilder.build();
@@ -1007,7 +1007,7 @@ public abstract class PulsarWebResource {
 
     protected static boolean isLeaderBroker(PulsarService pulsar) {
         // For extensible load manager, it doesn't have leader election service on pulsar broker.
-        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
             return true;
         }
         return  pulsar.getLeaderElectionService().isLeader();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 1362317fd8f..04d793ce619 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -317,7 +317,6 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase
             assertEquals(unloadCount.get(), 1);
         });
 
-
         String dstBrokerUrl = pulsar1.getBrokerId();
         String dstBrokerServiceUrl;
         if (broker.equals(pulsar1.getBrokerServiceUrl())) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 0ff64616973..efca2880949 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -1104,16 +1104,17 @@ public class TransferShedderTest {
         assertEquals(stats.std(), 2.5809568279517847E-8);
     }
 
-
     @Test
-    public void testMinBrokerWithZeroTraffic() throws IllegalAccessException {
+    public void testMinBrokerWithLowTraffic() throws IllegalAccessException {
         UnloadCounter counter = new UnloadCounter();
         TransferShedder transferShedder = new TransferShedder(counter);
         var ctx = setupContext();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
 
-        var load = getCpuLoad(ctx,  4, "broker2:8080");
-        FieldUtils.writeDeclaredField(load,"msgThroughputEMA", 0, true);
+        var load = getCpuLoad(ctx, 4, "broker2:8080");
+        FieldUtils.writeDeclaredField(load, "msgThroughputEMA", 10, true);
+
+
         brokerLoadDataStore.pushAsync("broker2:8080", load);
         brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx,  55, "broker4:8080"));
         brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx,  65, "broker5:8080"));
@@ -1268,10 +1269,10 @@ public class TransferShedderTest {
         Assertions.assertThat(res).isIn(
                 Set.of(new UnloadDecision(
                         new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
-                                Optional.of("broker52:8080")), Success, Overloaded)),
+                                Optional.of("broker52:8080")), Success, Underloaded)),
                 Set.of(new UnloadDecision(
                         new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
-                                Optional.of("broker83:8080")), Success, Overloaded))
+                                Optional.of("broker83:8080")), Success, Underloaded))
         );
         assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
         assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);