You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/24 14:31:29 UTC
[pulsar] branch branch-3.0 updated: [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c6bffc2605d [fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)
c6bffc2605d is described below
commit c6bffc2605d05c94eaca20d23502fb368926087f
Author: Kai Wang <kw...@apache.org>
AuthorDate: Thu Oct 19 07:08:49 2023 -0500
[fix][broker] Fix unload operation stuck when use ExtensibleLoadManager (#21332)
---
.../extensions/ExtensibleLoadManagerImpl.java | 2 +-
.../channel/ServiceUnitStateChannelImpl.java | 17 +-
.../extensions/manager/UnloadManager.java | 7 +
.../pulsar/broker/namespace/NamespaceService.java | 4 -
.../pulsar/broker/service/BrokerService.java | 15 +
.../extensions/ExtensibleLoadManagerImplTest.java | 311 ++++++++++++---------
.../channel/ServiceUnitStateChannelTest.java | 12 +-
7 files changed, 219 insertions(+), 149 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 85baf9ec4fb..d3119365ddf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -304,7 +304,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
}
});
});
- this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
this.unloadManager = new UnloadManager(unloadCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 02acb923c2d..68501d201f0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -200,7 +200,18 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
Unstable
}
+ public static ServiceUnitStateChannelImpl newInstance(PulsarService pulsar) {
+ return new ServiceUnitStateChannelImpl(pulsar);
+ }
+
public ServiceUnitStateChannelImpl(PulsarService pulsar) {
+ this(pulsar, MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS, OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS);
+ }
+
+ @VisibleForTesting
+ public ServiceUnitStateChannelImpl(PulsarService pulsar,
+ long inFlightStateWaitingTimeInMillis,
+ long ownershipMonitorDelayTimeInSecs) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
this.lookupServiceAddress = pulsar.getLookupServiceAddress();
@@ -210,8 +221,8 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
this.stateChangeListeners = new StateChangeListeners();
this.semiTerminalStateWaitingTimeInMillis = config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds()
* 1000;
- this.inFlightStateWaitingTimeInMillis = MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS;
- this.ownershipMonitorDelayTimeInSecs = OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS;
+ this.inFlightStateWaitingTimeInMillis = inFlightStateWaitingTimeInMillis;
+ this.ownershipMonitorDelayTimeInSecs = ownershipMonitorDelayTimeInSecs;
if (semiTerminalStateWaitingTimeInMillis < inFlightStateWaitingTimeInMillis) {
throw new IllegalArgumentException(
"Invalid Config: loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < "
@@ -837,7 +848,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
} finally {
var future = requested.getValue();
if (future != null) {
- future.orTimeout(inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS)
+ future.orTimeout(inFlightStateWaitingTimeInMillis + 5 * 1000, TimeUnit.MILLISECONDS)
.whenComplete((v, e) -> {
if (e != null) {
getOwnerRequests.remove(serviceUnit, future);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index 2dde0c4708e..ffdbbc2af42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -88,6 +88,13 @@ public class UnloadManager implements StateChangeListener {
@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
+ if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t);
+ }
+ this.complete(serviceUnit, t);
+ return;
+ }
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 0d69fd7ea78..4c2b4211747 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1544,10 +1544,6 @@ public class NamespaceService implements AutoCloseable {
public CompletableFuture<Boolean> checkOwnershipPresentAsync(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
- if (bundle.getNamespaceObject().equals(SYSTEM_NAMESPACE)) {
- return FutureUtil.failedFuture(new UnsupportedOperationException(
- "Ownership check for system namespace is not supported"));
- }
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7c7dec864e2..b5bd7bfaa3d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2202,6 +2202,21 @@ public class BrokerService implements Closeable {
if (serviceUnit.includes(topicName)) {
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
+ if (topicFuture.isCompletedExceptionally()) {
+ try {
+ topicFuture.get();
+ } catch (InterruptedException | ExecutionException ex) {
+ if (ex.getCause() instanceof ServiceUnitNotReadyException) {
+ // Topic was already unloaded
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Topic was already unloaded", topicName);
+ }
+ return;
+ } else {
+ log.warn("[{}] Got exception when closing topic", topicName, ex);
+ }
+ }
+ }
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 2d9de4b5e7f..f499998fd3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -38,9 +38,11 @@ import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace;
import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2;
import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace;
+import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -111,7 +113,8 @@ import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.awaitility.Awaitility;
+import org.mockito.MockedStatic;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -141,46 +144,56 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
@BeforeClass
@Override
public void setup() throws Exception {
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
- conf.setAllowAutoTopicCreation(true);
- conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
- conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- conf.setLoadBalancerSheddingEnabled(false);
- conf.setLoadBalancerDebugModeEnabled(true);
- conf.setTopicLevelPoliciesEnabled(false);
- super.internalSetup(conf);
- pulsar1 = pulsar;
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
- defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
- defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
- defaultConf.setTopicLevelPoliciesEnabled(false);
- additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
- pulsar2 = additionalPulsarTestContext.getPulsarService();
-
- setPrimaryLoadManager();
-
- setSecondaryLoadManager();
-
- admin.clusters().createCluster(this.conf.getClusterName(),
- ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
- admin.tenants().createTenant("public",
- new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
- Sets.newHashSet(this.conf.getClusterName())));
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setNamespaceReplicationClusters("public/default",
- Sets.newHashSet(this.conf.getClusterName()));
-
- admin.namespaces().createNamespace(defaultTestNamespace);
- admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
+ try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+ mockStatic(ServiceUnitStateChannelImpl.class)) {
+ channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+ .thenAnswer(invocation -> {
+ PulsarService pulsarService = invocation.getArgument(0);
+ // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
+ // stuck when doing unload.
+ return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1);
+ });
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
+ conf.setAllowAutoTopicCreation(true);
+ conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ conf.setLoadBalancerSheddingEnabled(false);
+ conf.setLoadBalancerDebugModeEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+ defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ defaultConf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ defaultConf.setTopicLevelPoliciesEnabled(true);
+ additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ setPrimaryLoadManager();
+
+ setSecondaryLoadManager();
+
+ admin.clusters().createCluster(this.conf.getClusterName(),
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
+ Sets.newHashSet(this.conf.getClusterName())));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
+ Sets.newHashSet(this.conf.getClusterName()));
+
+ admin.namespaces().createNamespace(defaultTestNamespace);
+ admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
+ Sets.newHashSet(this.conf.getClusterName()));
+ }
}
@Override
- @AfterClass
+ @AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
pulsar1 = null;
pulsar2.close();
@@ -538,119 +551,134 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeployAndRollbackLoadManager() throws Exception {
- // Test rollback to modular load manager.
- ServiceConfiguration defaultConf = getDefaultConf();
- defaultConf.setAllowAutoTopicCreation(true);
- defaultConf.setForceDeleteNamespaceAllowed(true);
- defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
- defaultConf.setLoadBalancerSheddingEnabled(false);
- try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
- // start pulsar3 with old load manager
- var pulsar3 = additionalPulsarTestContext.getPulsarService();
- String topic = "persistent://" + defaultTestNamespace + "/test";
-
- String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
-
- String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult1, lookupResult2);
- assertEquals(lookupResult1, lookupResult3);
-
- NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
- LookupOptions options = LookupOptions.builder()
- .authoritative(false)
- .requestHttps(false)
- .readOnly(false)
- .loadTopicsInBundle(false).build();
- Optional<URL> webServiceUrl1 =
- pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
- assertTrue(webServiceUrl1.isPresent());
- assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress());
-
- Optional<URL> webServiceUrl2 =
- pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
- assertTrue(webServiceUrl2.isPresent());
- assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
-
- Optional<URL> webServiceUrl3 =
- pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
- assertTrue(webServiceUrl3.isPresent());
- assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
-
- List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
- for (PulsarService pulsarService : pulsarServices) {
- // Test lookup heartbeat namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
- }
- // Test lookup SLA namespace's topic
- for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
- }
- }
-
- // Test deploy new broker with new load manager
- ServiceConfiguration conf = getDefaultConf();
- conf.setAllowAutoTopicCreation(true);
- conf.setForceDeleteNamespaceAllowed(true);
- conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
- conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
- try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) {
- var pulsar4 = additionPulsarTestContext.getPulsarService();
-
- Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
- pulsar2.getBrokerServiceUrl(),
- pulsar4.getBrokerServiceUrl());
- String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic);
- assertTrue(availableCandidates.contains(lookupResult4));
-
- String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
- String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
- assertEquals(lookupResult4, lookupResult5);
- assertEquals(lookupResult4, lookupResult6);
- assertEquals(lookupResult4, lookupResult7);
-
- Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(),
- pulsar2.getWebServiceAddress(),
- pulsar4.getWebServiceAddress());
-
- webServiceUrl1 =
+ try (MockedStatic<ServiceUnitStateChannelImpl> channelMockedStatic =
+ mockStatic(ServiceUnitStateChannelImpl.class)) {
+ channelMockedStatic.when(() -> ServiceUnitStateChannelImpl.newInstance(isA(PulsarService.class)))
+ .thenAnswer(invocation -> {
+ PulsarService pulsarService = invocation.getArgument(0);
+ // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
+ // stuck when doing unload.
+ return new ServiceUnitStateChannelImpl(pulsarService, 5 * 1000, 1);
+ });
+ // Test rollback to modular load manager.
+ ServiceConfiguration defaultConf = getDefaultConf();
+ defaultConf.setAllowAutoTopicCreation(true);
+ defaultConf.setForceDeleteNamespaceAllowed(true);
+ defaultConf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ defaultConf.setLoadBalancerSheddingEnabled(false);
+ try (var additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf)) {
+ // start pulsar3 with old load manager
+ var pulsar3 = additionalPulsarTestContext.getPulsarService();
+ String topic = "persistent://" + defaultTestNamespace + "/test";
+
+ String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult1, pulsar3.getBrokerServiceUrl());
+
+ String lookupResult2 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult3 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult1, lookupResult2);
+ assertEquals(lookupResult1, lookupResult3);
+
+ NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get();
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(false)
+ .requestHttps(false)
+ .readOnly(false)
+ .loadTopicsInBundle(false).build();
+ Optional<URL> webServiceUrl1 =
pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
assertTrue(webServiceUrl1.isPresent());
- assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+ assertEquals(webServiceUrl1.get().toString(), pulsar3.getWebServiceAddress());
- webServiceUrl2 =
+ Optional<URL> webServiceUrl2 =
pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
assertTrue(webServiceUrl2.isPresent());
assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
- // The pulsar3 will redirect to pulsar4
- webServiceUrl3 =
+ Optional<URL> webServiceUrl3 =
pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
assertTrue(webServiceUrl3.isPresent());
- // It will redirect to pulsar4
- assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
-
- var webServiceUrl4 =
- pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
- assertTrue(webServiceUrl4.isPresent());
- assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
+ assertEquals(webServiceUrl3.get().toString(), webServiceUrl1.get().toString());
- pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+ List<PulsarService> pulsarServices = List.of(pulsar1, pulsar2, pulsar3);
for (PulsarService pulsarService : pulsarServices) {
// Test lookup heartbeat namespace's topic
for (PulsarService pulsar : pulsarServices) {
- assertLookupHeartbeatOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ assertLookupHeartbeatOwner(pulsarService,
+ pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
}
// Test lookup SLA namespace's topic
for (PulsarService pulsar : pulsarServices) {
- assertLookupSLANamespaceOwner(pulsarService, pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ assertLookupSLANamespaceOwner(pulsarService,
+ pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ }
+
+ // Test deploy new broker with new load manager
+ ServiceConfiguration conf = getDefaultConf();
+ conf.setAllowAutoTopicCreation(true);
+ conf.setForceDeleteNamespaceAllowed(true);
+ conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
+ try (var additionPulsarTestContext = createAdditionalPulsarTestContext(conf)) {
+ var pulsar4 = additionPulsarTestContext.getPulsarService();
+
+ Set<String> availableCandidates = Sets.newHashSet(pulsar1.getBrokerServiceUrl(),
+ pulsar2.getBrokerServiceUrl(),
+ pulsar4.getBrokerServiceUrl());
+ String lookupResult4 = pulsar4.getAdminClient().lookups().lookupTopic(topic);
+ assertTrue(availableCandidates.contains(lookupResult4));
+
+ String lookupResult5 = pulsar1.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult6 = pulsar2.getAdminClient().lookups().lookupTopic(topic);
+ String lookupResult7 = pulsar3.getAdminClient().lookups().lookupTopic(topic);
+ assertEquals(lookupResult4, lookupResult5);
+ assertEquals(lookupResult4, lookupResult6);
+ assertEquals(lookupResult4, lookupResult7);
+
+ Set<String> availableWebUrlCandidates = Sets.newHashSet(pulsar1.getWebServiceAddress(),
+ pulsar2.getWebServiceAddress(),
+ pulsar4.getWebServiceAddress());
+
+ webServiceUrl1 =
+ pulsar1.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl1.isPresent());
+ assertTrue(availableWebUrlCandidates.contains(webServiceUrl1.get().toString()));
+
+ webServiceUrl2 =
+ pulsar2.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl2.isPresent());
+ assertEquals(webServiceUrl2.get().toString(), webServiceUrl1.get().toString());
+
+ // The pulsar3 will redirect to pulsar4
+ webServiceUrl3 =
+ pulsar3.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl3.isPresent());
+ // It will redirect to pulsar4
+ assertTrue(availableWebUrlCandidates.contains(webServiceUrl3.get().toString()));
+
+ var webServiceUrl4 =
+ pulsar4.getNamespaceService().getWebServiceUrl(bundle, options);
+ assertTrue(webServiceUrl4.isPresent());
+ assertEquals(webServiceUrl4.get().toString(), webServiceUrl1.get().toString());
+
+ pulsarServices = List.of(pulsar1, pulsar2, pulsar3, pulsar4);
+ for (PulsarService pulsarService : pulsarServices) {
+ // Test lookup heartbeat namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupHeartbeatOwner(pulsarService,
+ pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
+ // Test lookup SLA namespace's topic
+ for (PulsarService pulsar : pulsarServices) {
+ assertLookupSLANamespaceOwner(pulsarService,
+ pulsar.getLookupServiceAddress(), pulsar.getBrokerServiceUrl());
+ }
}
}
}
}
+
}
private void assertLookupHeartbeatOwner(PulsarService pulsar,
@@ -1089,6 +1117,12 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
NamespaceName heartbeatNamespacePulsar2V2 =
NamespaceService.getHeartbeatNamespaceV2(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+ NamespaceName slaMonitorNamespacePulsar1 =
+ getSLAMonitorNamespace(pulsar1.getLookupServiceAddress(), pulsar1.getConfiguration());
+
+ NamespaceName slaMonitorNamespacePulsar2 =
+ getSLAMonitorNamespace(pulsar2.getLookupServiceAddress(), pulsar2.getConfiguration());
+
NamespaceBundle bundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar1V1);
NamespaceBundle bundle2 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
@@ -1099,27 +1133,34 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
NamespaceBundle bundle4 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespacePulsar2V2);
+ NamespaceBundle slaBundle1 = pulsar1.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespacePulsar1);
+ NamespaceBundle slaBundle2 = pulsar2.getNamespaceService().getNamespaceBundleFactory()
+ .getFullBundle(slaMonitorNamespacePulsar2);
+
+
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
- assertEquals(ownedServiceUnitsByPulsar1.size(), 3);
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
+ assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
- assertEquals(ownedServiceUnitsByPulsar2.size(), 3);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
+ assertTrue(ownedServiceUnitsByPulsar2.contains(slaBundle2));
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar1 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar1.getLookupServiceAddress());
Map<String, NamespaceOwnershipStatus> ownedNamespacesByPulsar2 =
admin.brokers().getOwnedNamespaces(conf.getClusterName(), pulsar2.getLookupServiceAddress());
- assertEquals(ownedNamespacesByPulsar1.size(), 3);
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle1.toString()));
assertTrue(ownedNamespacesByPulsar1.containsKey(bundle2.toString()));
- assertEquals(ownedNamespacesByPulsar2.size(), 3);
+ assertTrue(ownedNamespacesByPulsar1.containsKey(slaBundle1.toString()));
+
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle3.toString()));
assertTrue(ownedNamespacesByPulsar2.containsKey(bundle4.toString()));
+ assertTrue(ownedNamespacesByPulsar2.containsKey(slaBundle2.toString()));
String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units";
admin.topics().createPartitionedTopic(topic, 1);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 990408d214b..e8ccd7b01ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -507,10 +507,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertEquals(1, getOwnerRequests1.size());
assertEquals(1, getOwnerRequests2.size());
- // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ // In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));
assertEquals(0, getOwnerRequests1.size());
@@ -1139,10 +1139,10 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest {
assertFalse(owner1.isDone());
assertFalse(owner2.isDone());
- // In 5 secs, the getOwnerAsync requests(lookup requests) should time out.
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ // In 10 secs, the getOwnerAsync requests(lookup requests) should time out.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner1.isCompletedExceptionally()));
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertTrue(owner2.isCompletedExceptionally()));
// recovered, check the monitor update state : Assigned -> Owned