You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by he...@apache.org on 2024/04/04 04:01:55 UTC
(pulsar) branch branch-3.1 updated: [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)
This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 651908a9033 [fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)
651908a9033 is described below
commit 651908a903301da9c07dc93300635cc28d8ee69f
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Wed Apr 3 21:01:49 2024 -0700
[fix][broker] Update TransferShedder underloaded broker check to consider max loaded broker's msgThroughputEMA and update IsExtensibleLoadBalancerImpl check (#22321) (#22417)
---
.../org/apache/pulsar/broker/PulsarService.java | 8 +++---
.../pulsar/broker/admin/impl/NamespacesBase.java | 4 +--
.../extensions/ExtensibleLoadManagerImpl.java | 4 ---
.../extensions/scheduler/TransferShedder.java | 22 +++++++++++-----
.../pulsar/broker/namespace/NamespaceService.java | 30 +++++++++++-----------
.../pulsar/broker/web/PulsarWebResource.java | 4 +--
.../extensions/ExtensibleLoadManagerImplTest.java | 1 -
.../extensions/scheduler/TransferShedderTest.java | 13 +++++-----
8 files changed, 45 insertions(+), 41 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 64f4ee02881..5410bacbe78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -379,7 +379,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
private void closeLeaderElectionService() throws Exception {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService().close();
} else {
if (this.leaderElectionService != null) {
@@ -1135,7 +1135,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
protected void startLeaderElectionService() {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("The load manager extension is enabled. Skipping PulsarService LeaderElectionService.");
return;
}
@@ -1250,7 +1250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
LOG.info("Starting load management service ...");
this.loadManager.get().start();
- if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (config.isLoadBalancerEnabled() && !ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
@@ -1343,7 +1343,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
* @return a reference of the current <code>LeaderElectionService</code> instance.
*/
public LeaderElectionService getLeaderElectionService() {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(this)) {
return ExtensibleLoadManagerImpl.get(loadManager.get()).getLeaderElectionService();
} else {
return this.leaderElectionService;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9478857032f..f4732cad380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -968,13 +968,13 @@ public abstract class NamespacesBase extends AdminResource {
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return CompletableFuture.completedFuture(null);
}
return validateLeaderBrokerAsync();
})
.thenAccept(__ -> {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar())) {
return;
}
// For ExtensibleLoadManager, this operation will be ignored.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 6a0e677c662..26ee45b7444 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -263,10 +263,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
}
- public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
- return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
- }
-
public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerWrapper;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 3564b4e9e3b..7126ccb0341 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -362,7 +362,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
if (stats.std() > targetStd
- || isUnderLoaded(context, stats.peekMinBroker(), stats.avg)
+ || isUnderLoaded(context, stats.peekMinBroker(), stats)
|| isOverLoaded(context, stats.peekMaxBroker(), stats.avg)) {
unloadConditionHitCount++;
} else {
@@ -390,7 +390,7 @@ public class TransferShedder implements NamespaceUnloadStrategy {
UnloadDecision.Reason reason;
if (stats.std() > targetStd) {
reason = Overloaded;
- } else if (isUnderLoaded(context, stats.peekMinBroker(), stats.avg)) {
+ } else if (isUnderLoaded(context, stats.peekMinBroker(), stats)) {
reason = Underloaded;
if (debugMode) {
log.info(String.format("broker:%s is underloaded:%s although "
@@ -669,19 +669,27 @@ public class TransferShedder implements NamespaceUnloadStrategy {
}
- private boolean isUnderLoaded(LoadManagerContext context, String broker, double avgLoad) {
+ private boolean isUnderLoaded(LoadManagerContext context, String broker, LoadStats stats) {
var brokerLoadDataOptional = context.brokerLoadDataStore().get(broker);
if (brokerLoadDataOptional.isEmpty()) {
return false;
}
var brokerLoadData = brokerLoadDataOptional.get();
- if (brokerLoadData.getMsgThroughputEMA() < 1) {
+
+ var underLoadedMultiplier =
+ Math.min(0.5, Math.max(0.0, context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2.0));
+
+ if (brokerLoadData.getWeightedMaxEMA() < stats.avg * underLoadedMultiplier) {
return true;
}
- return brokerLoadData.getWeightedMaxEMA()
- < avgLoad * Math.min(0.5, Math.max(0.0,
- context.brokerConfiguration().getLoadBalancerBrokerLoadTargetStd() / 2));
+ var maxBrokerLoadDataOptional = context.brokerLoadDataStore().get(stats.peekMaxBroker());
+ if (maxBrokerLoadDataOptional.isEmpty()) {
+ return false;
+ }
+
+ return brokerLoadData.getMsgThroughputEMA()
+ < maxBrokerLoadDataOptional.get().getMsgThroughputEMA() * underLoadedMultiplier;
}
private boolean isOverLoaded(LoadManagerContext context, String broker, double avgLoad) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 61e045ed304..e04be25fe49 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -195,7 +195,7 @@ public class NamespaceService implements AutoCloseable {
pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
} else {
// TODO: Add unit tests cover it.
@@ -311,7 +311,7 @@ public class NamespaceService implements AutoCloseable {
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<LookupResult>> future =
- ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+ ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
? loadManager.get().findBrokerServiceUrl(Optional.ofNullable(topic), bundle) :
findBrokerServiceUrl(bundle, options);
@@ -375,7 +375,7 @@ public class NamespaceService implements AutoCloseable {
NamespaceBundle nsFullBundle = bundleFactory.getFullBundle(nsname);
// v2 namespace will always use full bundle object
final NamespaceEphemeralData otherData;
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl loadManager = ExtensibleLoadManagerImpl.get(this.loadManager.get());
otherData = loadManager.tryAcquiringOwnership(nsFullBundle).get();
} else {
@@ -781,7 +781,7 @@ public class NamespaceService implements AutoCloseable {
long timeout,
TimeUnit timeoutUnit,
boolean closeWithoutWaitingClientDisconnect) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
}
@@ -803,7 +803,7 @@ public class NamespaceService implements AutoCloseable {
.getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
.thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
.thenCompose(namespaceIsolationPolicies -> {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
@@ -883,7 +883,7 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.splitNamespaceBundleAsync(bundle, splitAlgorithm, boundaries);
}
@@ -1126,7 +1126,7 @@ public class NamespaceService implements AutoCloseable {
}
public Set<NamespaceBundle> getOwnedServiceUnits() {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
}
@@ -1148,7 +1148,7 @@ public class NamespaceService implements AutoCloseable {
}
if (suName instanceof NamespaceBundle) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().checkOwnershipAsync(Optional.empty(), suName);
}
// TODO: Add unit tests cover it.
@@ -1176,7 +1176,7 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName topicName) {
// TODO: Add unit tests cover it.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
@@ -1191,7 +1191,7 @@ public class NamespaceService implements AutoCloseable {
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName fqnn) {
// TODO: Add unit tests cover it.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getFullBundleAsync(fqnn)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
}
@@ -1201,7 +1201,7 @@ public class NamespaceService implements AutoCloseable {
private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
// TODO: Add unit tests cover it.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topic)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
}
@@ -1210,7 +1210,7 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) {
// TODO: Add unit tests cover it.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return getBundleAsync(topicName)
.thenCompose(bundle -> loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
}
@@ -1220,7 +1220,7 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
CompletableFuture<Void> future;
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
} else {
@@ -1528,7 +1528,7 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipWithLookupDataAsync(bundle)
.thenCompose(lookupData -> lookupData
@@ -1545,7 +1545,7 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 0d01a2c5041..2f437962002 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -732,7 +732,7 @@ public abstract class PulsarWebResource {
.host(webUrl.get().getHost())
.port(webUrl.get().getPort())
.replaceQueryParam("authoritative", newAuthoritative);
- if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config())) {
+ if (!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
uriBuilder.replaceQueryParam("destinationBroker", null);
}
URI redirect = uriBuilder.build();
@@ -1007,7 +1007,7 @@ public abstract class PulsarWebResource {
protected static boolean isLeaderBroker(PulsarService pulsar) {
// For extensible load manager, it doesn't have leader election service on pulsar broker.
- if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return true;
}
return pulsar.getLeaderElectionService().isLeader();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 1362317fd8f..04d793ce619 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -317,7 +317,6 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase
assertEquals(unloadCount.get(), 1);
});
-
String dstBrokerUrl = pulsar1.getBrokerId();
String dstBrokerServiceUrl;
if (broker.equals(pulsar1.getBrokerServiceUrl())) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 0ff64616973..efca2880949 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -1104,16 +1104,17 @@ public class TransferShedderTest {
assertEquals(stats.std(), 2.5809568279517847E-8);
}
-
@Test
- public void testMinBrokerWithZeroTraffic() throws IllegalAccessException {
+ public void testMinBrokerWithLowTraffic() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
TransferShedder transferShedder = new TransferShedder(counter);
var ctx = setupContext();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
- var load = getCpuLoad(ctx, 4, "broker2:8080");
- FieldUtils.writeDeclaredField(load,"msgThroughputEMA", 0, true);
+ var load = getCpuLoad(ctx, 4, "broker2:8080");
+ FieldUtils.writeDeclaredField(load, "msgThroughputEMA", 10, true);
+
+
brokerLoadDataStore.pushAsync("broker2:8080", load);
brokerLoadDataStore.pushAsync("broker4:8080", getCpuLoad(ctx, 55, "broker4:8080"));
brokerLoadDataStore.pushAsync("broker5:8080", getCpuLoad(ctx, 65, "broker5:8080"));
@@ -1268,10 +1269,10 @@ public class TransferShedderTest {
Assertions.assertThat(res).isIn(
Set.of(new UnloadDecision(
new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
- Optional.of("broker52:8080")), Success, Overloaded)),
+ Optional.of("broker52:8080")), Success, Underloaded)),
Set.of(new UnloadDecision(
new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
- Optional.of("broker83:8080")), Success, Overloaded))
+ Optional.of("broker83:8080")), Success, Underloaded))
);
assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);