You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/31 12:29:34 UTC
[pulsar] branch master updated: [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e194c017f4f [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)
e194c017f4f is described below
commit e194c017f4f6abbac9596c44d9aa4f21c71d2458
Author: vineeth1995 <vi...@gmail.com>
AuthorDate: Sat Dec 31 04:29:24 2022 -0800
[improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)
Co-authored-by: Vineeth <vi...@verizonmedia.com>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 51 ++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 4 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 4 +-
.../pulsar/broker/loadbalance/LoadManager.java | 2 +
.../broker/loadbalance/ModularLoadManager.java | 2 +
.../pulsar/broker/loadbalance/NoopLoadManager.java | 13 ++++
.../loadbalance/impl/ModularLoadManagerImpl.java | 12 +++-
.../impl/ModularLoadManagerWrapper.java | 25 +++++--
.../loadbalance/impl/SimpleLoadManagerImpl.java | 14 ++++
.../pulsar/broker/namespace/NamespaceService.java | 3 +-
.../pulsar/broker/web/PulsarWebResource.java | 4 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 4 +-
.../loadbalance/ModularLoadManagerImplTest.java | 80 +++++++++++++++++++++-
.../org/apache/pulsar/client/admin/Namespaces.java | 27 ++++++++
.../client/admin/internal/NamespacesImpl.java | 14 ++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 6 +-
17 files changed, 249 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b33b84e5aed..756cc9b5c85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -38,7 +38,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -56,6 +58,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
@@ -878,6 +882,53 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ private void validateLeaderBroker() {
+ if (!this.isLeaderBroker()) {
+ LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
+ String leaderBrokerUrl = leaderBroker.getServiceUrl();
+ CompletableFuture<LookupResult> result = pulsar().getNamespaceService()
+ .createLookupResult(leaderBrokerUrl, false, null);
+ try {
+ LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
+ String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
+ : lookupResult.getLookupData().getHttpUrl();
+ if (redirectUrl == null) {
+ log.error("Redirected broker's service url is not configured");
+ throw new RestException(Response.Status.PRECONDITION_FAILED,
+ "Redirected broker's service url is not configured.");
+ }
+ URL url = new URL(redirectUrl);
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
+ .port(url.getPort())
+ .replaceQueryParam("authoritative",
+ false).build();
+
+ // Redirect
+ if (log.isDebugEnabled()) {
+ log.debug("Redirecting the request call to leader - {}", redirect);
+ }
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ } catch (MalformedURLException exception) {
+ log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
+ throw new RestException(exception);
+ } catch (ExecutionException | InterruptedException exception) {
+ log.error("Leader broker not found - {}", leaderBrokerUrl);
+ throw new RestException(exception.getCause());
+ } catch (TimeoutException exception) {
+ log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
+ throw new RestException(exception);
+ }
+ }
+ }
+
+ public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
+ if (StringUtils.isBlank(destinationBroker)) {
+ return;
+ }
+ validateLeaderBroker();
+ pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
+ }
+
public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ffb0e49d365..fd67de41172 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -887,8 +887,10 @@ public class Namespaces extends NamespacesBase {
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
+ setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b6bf1f0927c..f0a4dbb01dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -813,8 +813,10 @@ public class Namespaces extends NamespacesBase {
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
+ setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index e34215d1996..b4df5d31968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -119,6 +119,8 @@ public interface LoadManager {
CompletableFuture<Set<String>> getAvailableBrokersAsync();
+ String setNamespaceBundleAffinity(String bundle, String broker);
+
void stop() throws PulsarServerException;
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index fa6895568e9..d608bd6784f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -140,4 +140,6 @@ public interface ModularLoadManager {
* @return bundle data
*/
BundleData getBundleDataOrDefault(String bundle);
+
+ String setNamespaceBundleAffinity(String bundle, String broker);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 1ab56b50cde..e8c0567fd0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -20,11 +20,14 @@ package org.apache.pulsar.broker.loadbalance;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
@@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager {
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
+ private Map<String, String> bundleBrokerAffinityMap;
@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
+ this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}
@Override
@@ -142,4 +147,12 @@ public class NoopLoadManager implements LoadManager {
}
}
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker) {
+ if (StringUtils.isBlank(broker)) {
+ return this.bundleBrokerAffinityMap.remove(bundle);
+ }
+ broker = broker.replaceFirst("http[s]?://", "");
+ return this.bundleBrokerAffinityMap.put(bundle, broker);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7d1a21a8c90..6e63643a859 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
+ private Map<String, String> bundleBrokerAffinityMap;
/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
@@ -215,7 +216,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();
-
+ this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -1212,4 +1213,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
return metricsCollection;
}
+
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker) {
+ if (StringUtils.isBlank(broker)) {
+ return this.bundleBrokerAffinityMap.remove(bundle);
+ }
+ broker = broker.replaceFirst("http[s]?://", "");
+ return this.bundleBrokerAffinityMap.put(bundle, broker);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 5f7cd5b8c38..c61d39cf315 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -65,13 +66,13 @@ public class ModularLoadManagerWrapper implements LoadManager {
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
+ String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
+ String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null);
+ if (!StringUtils.isBlank(affinityBroker)) {
+ return Optional.of(buildBrokerResourceUnit(affinityBroker));
+ }
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
- return leastLoadedBroker.map(s -> {
- String webServiceUrl = getBrokerWebServiceUrl(s);
- String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
- return new SimpleResourceUnit(webServiceUrl,
- new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
- });
+ return leastLoadedBroker.map(this::buildBrokerResourceUnit);
}
private String getBrokerWebServiceUrl(String broker) {
@@ -146,4 +147,16 @@ public class ModularLoadManagerWrapper implements LoadManager {
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return loadManager.getAvailableBrokersAsync();
}
+
+ private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
+ String webServiceUrl = getBrokerWebServiceUrl(broker);
+ String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
+ return new SimpleResourceUnit(webServiceUrl,
+ new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+ }
+
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker) {
+ return loadManager.setNamespaceBundleAffinity(bundle, broker);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index c2c0d1947c9..c98857b2fc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -40,12 +40,14 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
private volatile Future<?> updateRankingHandle;
+ private Map<String, String> bundleBrokerAffinityMap;
+
// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -251,6 +255,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
}
});
this.pulsar = pulsar;
+ this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}
public SimpleLoadManagerImpl(PulsarService pulsar) {
@@ -1443,6 +1448,15 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
}
}
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker) {
+ if (StringUtils.isBlank(broker)) {
+ return this.bundleBrokerAffinityMap.remove(bundle);
+ }
+ broker = broker.replaceFirst("http[s]?://", "");
+ return this.bundleBrokerAffinityMap.put(bundle, broker);
+ }
+
@Override
public void stop() throws PulsarServerException {
try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4be64f4b7b5..27df77e815d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -601,7 +601,7 @@ public class NamespaceService implements AutoCloseable {
}
}
- protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
+ public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName) {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
@@ -692,6 +692,7 @@ public class NamespaceService implements AutoCloseable {
String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
+
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8f238614fe3..7c1d2899758 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -710,8 +710,8 @@ public abstract class PulsarWebResource {
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
- newAuthoritative).build();
-
+ newAuthoritative).replaceQueryParam("destinationBroker",
+ null).build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0f0ee8df3fa..0a92e0cc8bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -726,7 +726,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(uri).when(uriInfo).getRequestUri();
namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
- this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false);
+ this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null);
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -1053,7 +1053,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);
namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
- false);
+ false, null);
verify(response, timeout(5000).times(1)).resume(any(RestException.class));
// cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 05e82484226..4b9f679f19d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import static java.lang.Thread.sleep;
import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@@ -40,6 +41,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -85,6 +87,7 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -102,6 +105,8 @@ public class ModularLoadManagerImplTest {
private PulsarService pulsar2;
private PulsarAdmin admin2;
+ private PulsarService pulsar3;
+
private String primaryHost;
private String secondaryHost;
@@ -181,6 +186,20 @@ public class ModularLoadManagerImplTest {
pulsar2 = new PulsarService(config2);
pulsar2.start();
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
+ config.setClusterName("use");
+ config.setWebServicePort(Optional.of(0));
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ pulsar3 = new PulsarService(config);
+
secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
url2 = new URL(pulsar2.getWebServiceAddress());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
@@ -188,7 +207,7 @@ public class ModularLoadManagerImplTest {
primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
- Thread.sleep(100);
+ sleep(100);
}
@AfterMethod(alwaysRun = true)
@@ -201,6 +220,10 @@ public class ModularLoadManagerImplTest {
pulsar2.close();
pulsar1.close();
+
+ if (pulsar3.isRunning()) {
+ pulsar3.close();
+ }
bkEnsemble.stop();
}
@@ -284,6 +307,59 @@ public class ModularLoadManagerImplTest {
}
}
+
+
+ @Test
+ public void testBrokerAffinity() throws Exception {
+ // Start broker 3
+ pulsar3.start();
+
+ final String tenant = "test";
+ final String cluster = "test";
+ String namespace = tenant + "/" + cluster + "/" + "test";
+ String topic = "persistent://" + namespace + "/my-topic1";
+ admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+ admin1.tenants().createTenant(tenant,
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(namespace, 16);
+
+ String topicLookup = admin1.lookups().lookupTopic(topic);
+ String bundleRange = admin1.lookups().getBundleRange(topic);
+
+ String brokerServiceUrl = pulsar1.getBrokerServiceUrl();
+ String brokerUrl = pulsar1.getSafeWebServiceAddress();
+ log.debug("initial broker service url - {}", topicLookup);
+ Random rand=new Random();
+
+ if (topicLookup.equals(brokerServiceUrl)) {
+ int x = rand.nextInt(2);
+ if (x == 0) {
+ brokerUrl = pulsar2.getSafeWebServiceAddress();
+ brokerServiceUrl = pulsar2.getBrokerServiceUrl();
+ }
+ else {
+ brokerUrl = pulsar3.getSafeWebServiceAddress();
+ brokerServiceUrl = pulsar3.getBrokerServiceUrl();
+ }
+ }
+ brokerUrl = brokerUrl.replaceFirst("http[s]?://", "");
+ log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl);
+ String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl();
+ log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress());
+ //Make a call to broker which is not a leader
+ if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) {
+ admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+ }
+ else {
+ admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+ }
+
+ sleep(2000);
+ String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic);
+ log.debug("final broker service url - {}", topicLookupAfterUnload);
+ Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
+ }
+
/**
* It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker
* unless all the brokers are in same state.
@@ -345,7 +421,7 @@ public class ModularLoadManagerImplTest {
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
- Thread.sleep(100);
+ sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));
primaryLoadManager.doLoadShedding();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f4c284bb484..2690df658b7 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2090,6 +2090,20 @@ public interface Namespaces {
*/
void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException;
+ /**
+ * Unload namespace bundle and assign the bundle to specified broker.
+ *
+ * @param namespace
+ * @param bundle
+ * range of bundle to unload
+ * @param destinationBroker
+ * Target broker url to which the bundle should be assigned to
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void unloadNamespaceBundle(String namespace, String bundle, String destinationBroker) throws PulsarAdminException;
+
+
/**
* Unload namespace bundle asynchronously.
*
@@ -2101,6 +2115,19 @@ public interface Namespaces {
*/
CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle);
+ /**
+ * Unload namespace bundle asynchronously.
+ *
+ * @param namespace
+ * @param bundle
+ * range of bundle to unload
+ * @param destinationBroker
+ * Target broker url to which the bundle should be assigned to
+ *
+ * @return a future that can be used to track when the bundle is unloaded
+ */
+ CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle, String destinationBroker);
+
/**
* Split namespace bundle.
*
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6d4889a751d..fa3155c59d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -814,6 +814,12 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
sync(() -> unloadNamespaceBundleAsync(namespace, bundle));
}
+ @Override
+ public void unloadNamespaceBundle(String namespace,
+ String bundle, String destinationBroker) throws PulsarAdminException {
+ sync(() -> unloadNamespaceBundleAsync(namespace, bundle, destinationBroker));
+ }
+
@Override
public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
NamespaceName ns = NamespaceName.get(namespace);
@@ -821,6 +827,14 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
+ @Override
+ public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace,
+ String bundle, String destinationBroker) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, bundle, "unload").queryParam("destinationBroker", destinationBroker);
+ return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
+
@Override
public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
String splitAlgorithmName) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 940e9691116..d715c6b4e06 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -442,7 +442,7 @@ public class PulsarAdminToolTest {
namespaces = new CmdNamespaces(() -> admin);
namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff"));
- verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff");
+ verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", null);
namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff"));
verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index b64df272b44..f673ff40b79 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -880,13 +880,17 @@ public class CmdNamespaces extends CmdBase {
@Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}")
private String bundle;
+ @Parameter(names = { "--destinationBroker", "-d" },
+ description = "Target brokerWebServiceAddress to which the bundle has to be allocated to")
+ private String destinationBroker;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
if (bundle == null) {
getAdmin().namespaces().unload(namespace);
} else {
- getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle);
+ getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker);
}
}
}