You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/31 12:29:34 UTC

[pulsar] branch master updated: [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e194c017f4f [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)
e194c017f4f is described below

commit e194c017f4f6abbac9596c44d9aa4f21c71d2458
Author: vineeth1995 <vi...@gmail.com>
AuthorDate: Sat Dec 31 04:29:24 2022 -0800

    [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url (#18663)
    
    Co-authored-by: Vineeth <vi...@verizonmedia.com>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 51 ++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  4 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  4 +-
 .../pulsar/broker/loadbalance/LoadManager.java     |  2 +
 .../broker/loadbalance/ModularLoadManager.java     |  2 +
 .../pulsar/broker/loadbalance/NoopLoadManager.java | 13 ++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 12 +++-
 .../impl/ModularLoadManagerWrapper.java            | 25 +++++--
 .../loadbalance/impl/SimpleLoadManagerImpl.java    | 14 ++++
 .../pulsar/broker/namespace/NamespaceService.java  |  3 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  4 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  4 +-
 .../loadbalance/ModularLoadManagerImplTest.java    | 80 +++++++++++++++++++++-
 .../org/apache/pulsar/client/admin/Namespaces.java | 27 ++++++++
 .../client/admin/internal/NamespacesImpl.java      | 14 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  2 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  6 +-
 17 files changed, 249 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b33b84e5aed..756cc9b5c85 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -38,7 +38,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -56,6 +58,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
@@ -878,6 +882,53 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    private void validateLeaderBroker() {
+        if (!this.isLeaderBroker()) {
+            LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
+            String leaderBrokerUrl = leaderBroker.getServiceUrl();
+            CompletableFuture<LookupResult> result = pulsar().getNamespaceService()
+                    .createLookupResult(leaderBrokerUrl, false, null);
+            try {
+                LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
+                String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
+                        : lookupResult.getLookupData().getHttpUrl();
+                if (redirectUrl == null) {
+                    log.error("Redirected broker's service url is not configured");
+                    throw new RestException(Response.Status.PRECONDITION_FAILED,
+                            "Redirected broker's service url is not configured.");
+                }
+                URL url = new URL(redirectUrl);
+                URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
+                        .port(url.getPort())
+                        .replaceQueryParam("authoritative",
+                                false).build();
+
+                // Redirect
+                if (log.isDebugEnabled()) {
+                    log.debug("Redirecting the request call to leader - {}", redirect);
+                }
+                throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+            } catch (MalformedURLException exception) {
+                log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
+                throw new RestException(exception);
+            } catch (ExecutionException | InterruptedException exception) {
+                log.error("Leader broker not found - {}", leaderBrokerUrl);
+                throw new RestException(exception.getCause());
+            } catch (TimeoutException exception) {
+                log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
+                throw new RestException(exception);
+            }
+        }
+    }
+
+    public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
+        if (StringUtils.isBlank(destinationBroker)) {
+            return;
+        }
+        validateLeaderBroker();
+        pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
+    }
+
     public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
         return validateSuperUserAccessAsync()
                 .thenAccept(__ -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index ffb0e49d365..fd67de41172 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -887,8 +887,10 @@ public class Namespaces extends NamespacesBase {
     public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
             @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("destinationBroker") String destinationBroker) {
         validateNamespaceName(property, cluster, namespace);
+        setNamespaceBundleAffinity(bundleRange, destinationBroker);
         internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
                 .thenAccept(__ -> {
                     log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index b6bf1f0927c..f0a4dbb01dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -813,8 +813,10 @@ public class Namespaces extends NamespacesBase {
     public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("bundle") String bundleRange,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+                                      @QueryParam("destinationBroker") String destinationBroker) {
         validateNamespaceName(tenant, namespace);
+        setNamespaceBundleAffinity(bundleRange, destinationBroker);
         internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
                 .thenAccept(__ -> {
                     log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index e34215d1996..b4df5d31968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -119,6 +119,8 @@ public interface LoadManager {
 
     CompletableFuture<Set<String>> getAvailableBrokersAsync();
 
+    String setNamespaceBundleAffinity(String bundle, String broker);
+
     void stop() throws PulsarServerException;
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index fa6895568e9..d608bd6784f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -140,4 +140,6 @@ public interface ModularLoadManager {
      * @return bundle data
      */
     BundleData getBundleDataOrDefault(String bundle);
+
+    String setNamespaceBundleAffinity(String bundle, String broker);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
index 1ab56b50cde..e8c0567fd0c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java
@@ -20,11 +20,14 @@ package org.apache.pulsar.broker.loadbalance;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
@@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager {
     private String lookupServiceAddress;
     private ResourceUnit localResourceUnit;
     private LockManager<LocalBrokerData> lockManager;
+    private Map<String, String> bundleBrokerAffinityMap;
 
     @Override
     public void initialize(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
+        this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -142,4 +147,12 @@ public class NoopLoadManager implements LoadManager {
         }
     }
 
+    @Override
+    public String setNamespaceBundleAffinity(String bundle, String broker) {
+        if (StringUtils.isBlank(broker)) {
+            return this.bundleBrokerAffinityMap.remove(bundle);
+        }
+        broker = broker.replaceFirst("http[s]?://", "");
+        return this.bundleBrokerAffinityMap.put(bundle, broker);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7d1a21a8c90..6e63643a859 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
 
     private final Lock lock = new ReentrantLock();
     private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
+    private Map<String, String> bundleBrokerAffinityMap;
 
     /**
      * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
@@ -215,7 +216,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
         scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
         this.brokerToFailureDomainMap = new HashMap<>();
-
+        this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
             public boolean isEnablePersistentTopics(String brokerUrl) {
@@ -1212,4 +1213,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
 
         return metricsCollection;
     }
+
+    @Override
+    public String setNamespaceBundleAffinity(String bundle, String broker) {
+        if (StringUtils.isBlank(broker)) {
+            return this.bundleBrokerAffinityMap.remove(bundle);
+        }
+        broker = broker.replaceFirst("http[s]?://", "");
+        return this.bundleBrokerAffinityMap.put(bundle, broker);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 5f7cd5b8c38..c61d39cf315 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -65,13 +66,13 @@ public class ModularLoadManagerWrapper implements LoadManager {
 
     @Override
     public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
+        String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
+        String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null);
+        if (!StringUtils.isBlank(affinityBroker)) {
+            return Optional.of(buildBrokerResourceUnit(affinityBroker));
+        }
         Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
-        return leastLoadedBroker.map(s -> {
-            String webServiceUrl = getBrokerWebServiceUrl(s);
-            String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
-            return new SimpleResourceUnit(webServiceUrl,
-                new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
-        });
+        return leastLoadedBroker.map(this::buildBrokerResourceUnit);
     }
 
     private String getBrokerWebServiceUrl(String broker) {
@@ -146,4 +147,16 @@ public class ModularLoadManagerWrapper implements LoadManager {
     public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
         return loadManager.getAvailableBrokersAsync();
     }
+
+    private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
+        String webServiceUrl = getBrokerWebServiceUrl(broker);
+        String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
+        return new SimpleResourceUnit(webServiceUrl,
+                new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
+    }
+
+    @Override
+    public String setNamespaceBundleAffinity(String bundle, String broker) {
+        return loadManager.setNamespaceBundleAffinity(bundle, broker);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index c2c0d1947c9..c98857b2fc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -40,12 +40,14 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
 
     private volatile Future<?> updateRankingHandle;
 
+    private Map<String, String> bundleBrokerAffinityMap;
+
     // Perform initializations which may be done without a PulsarService.
     public SimpleLoadManagerImpl() {
         scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -251,6 +255,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
                     }
                 });
         this.pulsar = pulsar;
+        this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
     }
 
     public SimpleLoadManagerImpl(PulsarService pulsar) {
@@ -1443,6 +1448,15 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification
         }
     }
 
+    @Override
+    public String setNamespaceBundleAffinity(String bundle, String broker) {
+        if (StringUtils.isBlank(broker)) {
+            return this.bundleBrokerAffinityMap.remove(bundle);
+        }
+        broker = broker.replaceFirst("http[s]?://", "");
+        return this.bundleBrokerAffinityMap.put(bundle, broker);
+    }
+
     @Override
     public void stop() throws PulsarServerException {
         try {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4be64f4b7b5..27df77e815d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -601,7 +601,7 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
-    protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
+    public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
                                                                  final String advertisedListenerName) {
 
         CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
@@ -692,6 +692,7 @@ public class NamespaceService implements AutoCloseable {
         String lookupAddress = leastLoadedBroker.get().getResourceId();
         String advertisedAddr = (String) leastLoadedBroker.get()
                 .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
                     pulsar.getSafeWebServiceAddress(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 8f238614fe3..7c1d2899758 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -710,8 +710,8 @@ public abstract class PulsarWebResource {
                                     // Replace the host and port of the current request and redirect
                                     URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
                                             .port(webUrl.get().getPort()).replaceQueryParam("authoritative",
-                                                    newAuthoritative).build();
-
+                                                    newAuthoritative).replaceQueryParam("destinationBroker",
+                                                    null).build();
                                     log.debug("{} is not a service unit owned", bundle);
                                     // Redirect
                                     log.debug("Redirecting the rest call to {}", redirect);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0f0ee8df3fa..0a92e0cc8bc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -726,7 +726,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(uri).when(uriInfo).getRequestUri();
 
         namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
-                this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false);
+                this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null);
         captor = ArgumentCaptor.forClass(WebApplicationException.class);
         verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -1053,7 +1053,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
         AsyncResponse response = mock(AsyncResponse.class);
         namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
-                false);
+                false, null);
         verify(response, timeout(5000).times(1)).resume(any(RestException.class));
 
         // cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 05e82484226..4b9f679f19d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import static java.lang.Thread.sleep;
 import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -40,6 +41,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -85,6 +87,7 @@ import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.awaitility.Awaitility;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -102,6 +105,8 @@ public class ModularLoadManagerImplTest {
     private PulsarService pulsar2;
     private PulsarAdmin admin2;
 
+    private PulsarService pulsar3;
+
     private String primaryHost;
     private String secondaryHost;
 
@@ -181,6 +186,20 @@ public class ModularLoadManagerImplTest {
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
 
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
+        config.setClusterName("use");
+        config.setWebServicePort(Optional.of(0));
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setBrokerServicePort(Optional.of(0));
+        config.setBrokerServicePortTls(Optional.of(0));
+        config.setWebServicePortTls(Optional.of(0));
+        pulsar3 = new PulsarService(config);
+
         secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
         url2 = new URL(pulsar2.getWebServiceAddress());
         admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
@@ -188,7 +207,7 @@ public class ModularLoadManagerImplTest {
         primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
         secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
         nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
-        Thread.sleep(100);
+        sleep(100);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -201,6 +220,10 @@ public class ModularLoadManagerImplTest {
 
         pulsar2.close();
         pulsar1.close();
+        
+        if (pulsar3.isRunning()) {
+            pulsar3.close();
+        }
 
         bkEnsemble.stop();
     }
@@ -284,6 +307,59 @@ public class ModularLoadManagerImplTest {
         }
     }
 
+
+    
+    @Test
+    public void testBrokerAffinity() throws Exception {
+        // Start broker 3
+        pulsar3.start();
+        
+        final String tenant = "test";
+        final String cluster = "test";
+        String namespace = tenant + "/" + cluster + "/" + "test";
+        String topic = "persistent://" + namespace + "/my-topic1";
+        admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build());
+        admin1.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(namespace, 16);
+        
+        String topicLookup = admin1.lookups().lookupTopic(topic);
+        String bundleRange = admin1.lookups().getBundleRange(topic);
+        
+        String brokerServiceUrl = pulsar1.getBrokerServiceUrl();
+        String brokerUrl = pulsar1.getSafeWebServiceAddress();
+        log.debug("initial broker service url - {}", topicLookup);
+        Random rand=new Random();
+        
+        if (topicLookup.equals(brokerServiceUrl)) {
+            int x = rand.nextInt(2);
+            if (x == 0) {
+                brokerUrl = pulsar2.getSafeWebServiceAddress();
+                brokerServiceUrl = pulsar2.getBrokerServiceUrl();
+            }
+            else {
+                brokerUrl = pulsar3.getSafeWebServiceAddress();
+                brokerServiceUrl = pulsar3.getBrokerServiceUrl();
+            }
+        }
+        brokerUrl = brokerUrl.replaceFirst("http[s]?://", "");
+        log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl);
+        String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl();
+        log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress());
+        //Make a call to broker which is not a leader
+        if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) {
+            admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+        }
+        else {
+            admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl);
+        }
+        
+        sleep(2000);
+        String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic);
+        log.debug("final broker service url - {}", topicLookupAfterUnload);
+        Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload);
+    }
+
     /**
      * It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker
      * unless all the brokers are in same state.
@@ -345,7 +421,7 @@ public class ModularLoadManagerImplTest {
         // Need to update all the bundle data for the shredder to see the spy.
         primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
 
-        Thread.sleep(100);
+        sleep(100);
         localBrokerData.setCpu(new ResourceUsage(80, 100));
         primaryLoadManager.doLoadShedding();
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f4c284bb484..2690df658b7 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2090,6 +2090,20 @@ public interface Namespaces {
      */
     void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException;
 
+    /**
+     * Unload namespace bundle and assign the bundle to specified broker.
+     *
+     * @param namespace
+     * @param bundle
+     *           range of bundle to unload
+     * @param destinationBroker
+     *           Target broker url to which the bundle should be assigned to
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void unloadNamespaceBundle(String namespace, String bundle, String destinationBroker) throws PulsarAdminException;
+
+
     /**
      * Unload namespace bundle asynchronously.
      *
@@ -2101,6 +2115,19 @@ public interface Namespaces {
      */
     CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle);
 
+    /**
+     * Unload namespace bundle asynchronously.
+     *
+     * @param namespace
+     * @param bundle
+     *           range of bundle to unload
+     * @param destinationBroker
+     *           Target broker url to which the bundle should be assigned to
+     *
+     * @return a future that can be used to track when the bundle is unloaded
+     */
+    CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle, String destinationBroker);
+
     /**
      * Split namespace bundle.
      *
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6d4889a751d..fa3155c59d6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -814,6 +814,12 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         sync(() -> unloadNamespaceBundleAsync(namespace, bundle));
     }
 
+    @Override
+    public void unloadNamespaceBundle(String namespace,
+           String bundle, String destinationBroker) throws PulsarAdminException {
+        sync(() -> unloadNamespaceBundleAsync(namespace, bundle, destinationBroker));
+    }
+
     @Override
     public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace, String bundle) {
         NamespaceName ns = NamespaceName.get(namespace);
@@ -821,6 +827,14 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public CompletableFuture<Void> unloadNamespaceBundleAsync(String namespace,
+                                   String bundle, String destinationBroker) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, bundle, "unload").queryParam("destinationBroker", destinationBroker);
+        return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
     @Override
     public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
                                      String splitAlgorithmName) throws PulsarAdminException {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 940e9691116..d715c6b4e06 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -442,7 +442,7 @@ public class PulsarAdminToolTest {
         namespaces = new CmdNamespaces(() -> admin);
 
         namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff"));
-        verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff");
+        verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", null);
 
         namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff"));
         verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index b64df272b44..f673ff40b79 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -880,13 +880,17 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}")
         private String bundle;
 
+        @Parameter(names = { "--destinationBroker", "-d" },
+                description = "Target brokerWebServiceAddress to which the bundle has to be allocated to")
+        private String destinationBroker;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             if (bundle == null) {
                 getAdmin().namespaces().unload(namespace);
             } else {
-                getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle);
+                getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker);
             }
         }
     }