You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/01/25 02:00:00 UTC

[incubator-pulsar] branch master updated: PIP-7 Introduce Failure-domain and Anti-affinity-namespace group (#896)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb9834  PIP-7 Introduce Failure-domain and Anti-affinity-namespace group (#896)
6bb9834 is described below

commit 6bb98344ca48f81aaf403372400cb38013616e9d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jan 24 17:59:58 2018 -0800

    PIP-7 Introduce Failure-domain and Anti-affinity-namespace group (#896)
    
    * Introduce Failure-domain and Anti-affinity-namespace group
---
 conf/broker.conf                                   |   3 +
 conf/standalone.conf                               |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 .../broker/cache/ConfigurationCacheService.java    |  50 ++
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |  24 +-
 .../org/apache/pulsar/broker/admin/Clusters.java   | 219 ++++++++-
 .../org/apache/pulsar/broker/admin/Namespaces.java | 140 +++++-
 .../broker/loadbalance/impl/LoadManagerShared.java | 242 ++++++++-
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 104 +++-
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |   2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  21 +-
 .../apache/pulsar/broker/SLAMonitoringTest.java    |   2 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  16 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  71 ++-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  11 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |   2 +-
 .../pulsar/broker/auth/AuthorizationTest.java      |   2 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   4 +-
 .../AntiAffinityNamespaceGroupTest.java            | 540 +++++++++++++++++++++
 .../loadbalance/ModularLoadManagerImplTest.java    |  14 +-
 .../broker/service/BacklogQuotaManagerTest.java    |   2 +-
 .../broker/service/BrokerBkEnsemblesTests.java     |   2 +-
 .../pulsar/broker/service/ReplicatorTestBase.java  |   6 +-
 .../ZooKeeperSessionExpireRecoveryTest.java        |   8 +-
 .../api/AuthenticatedProducerConsumerTest.java     |  10 +-
 .../pulsar/client/api/NonPersistentTopicTest.java  |   9 +-
 .../pulsar/client/api/TlsProducerConsumerBase.java |   5 +-
 .../service/web/DiscoveryServiceWebTest.java       |   4 +-
 .../websocket/proxy/ProxyAuthorizationTest.java    |   7 +-
 .../org/apache/pulsar/client/admin/Clusters.java   | 132 +++++
 .../org/apache/pulsar/client/admin/Namespaces.java |  72 +++
 .../pulsar/client/admin/internal/ClustersImpl.java |  46 ++
 .../client/admin/internal/NamespacesImpl.java      |  49 ++
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  85 ++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  64 +++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  32 ++
 .../pulsar/common/policies/data/FailureDomain.java |  52 ++
 .../pulsar/common/policies/data/Policies.java      |   5 +-
 .../ProxyAuthenticatedProducerConsumerTest.java    |   5 +-
 40 files changed, 1975 insertions(+), 103 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d983b3f..de8ab1a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -46,6 +46,9 @@ advertisedAddress=
 # Name of the cluster to which this broker belongs to
 clusterName=
 
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ec400fb..52b3174 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -39,6 +39,9 @@ advertisedAddress=
 # Name of the cluster to which this broker belongs to
 clusterName=standalone
 
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2585f33..7f01a87 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // Name of the cluster to which this broker belongs to
     @FieldContext(required = true)
     private String clusterName;
+    // Enable cluster's failure-domain which can distribute brokers into logical region
+    @FieldContext(dynamic = true)
+    private boolean failureDomainsEnabled = false;
     // Zookeeper session timeout in milliseconds
     private long zooKeeperSessionTimeoutMillis = 30000;
     // Time to wait for broker graceful shutdown. After this time elapses, the
@@ -472,6 +475,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
         this.clusterName = clusterName;
     }
 
+    public boolean isFailureDomainsEnabled() {
+        return failureDomainsEnabled;
+    }
+
+    public void setFailureDomainsEnabled(boolean failureDomainsEnabled) {
+        this.failureDomainsEnabled = failureDomainsEnabled;
+    }
+
     public long getBrokerShutdownTimeoutMs() {
         return brokerShutdownTimeoutMs;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 1c8ad80..41a8f93 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.broker.cache;
 import java.util.Map;
 
 import org.apache.bookkeeper.util.ZkUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Maps;
 
 /**
  * ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
@@ -53,13 +56,21 @@ public class ConfigurationCacheService {
     private ZooKeeperDataCache<Policies> policiesCache;
     private ZooKeeperDataCache<ClusterData> clustersCache;
     private ZooKeeperChildrenCache clustersListCache;
+    private ZooKeeperChildrenCache failureDomainListCache;
     private ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache;
+    private ZooKeeperDataCache<FailureDomain> failureDomainCache;
 
     public static final String POLICIES = "policies";
+    public static final String FAILURE_DOMAIN = "failureDomain";
+    public final String CLUSTER_FAILURE_DOMAIN_ROOT;
     public static final String POLICIES_ROOT = "/admin/policies";
     private static final String CLUSTERS_ROOT = "/admin/clusters";
 
     public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
+        this(cache, null);
+    }
+
+    public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
         this.cache = cache;
 
         initZK();
@@ -86,6 +97,12 @@ public class ConfigurationCacheService {
         };
 
         this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);
+        
+        CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
+        if (isNotBlank(configuredClusterName)) {
+            createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
+            this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT);
+        }
 
         this.namespaceIsolationPoliciesCache = new ZooKeeperDataCache<NamespaceIsolationPolicies>(cache) {
             @Override
@@ -96,6 +113,31 @@ public class ConfigurationCacheService {
                         }));
             }
         };
+        
+        this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
+            @Override
+            public FailureDomain deserialize(String path, byte[] content) throws Exception {
+                return ObjectMapperFactory.getThreadLocal().readValue(content, FailureDomain.class);
+            }
+        };
+    }
+
+    private void createFailureDomainRoot(ZooKeeper zk, String path) {
+        try {
+            if (zk.exists(path, false) == null) {
+                try {
+                    byte[] data = "".getBytes();
+                    ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    LOG.info("Successfully created failure-domain znode at {}", path);
+                } catch (KeeperException.NodeExistsException e) {
+                    // Ok
+                }
+            }
+        } catch (KeeperException.NodeExistsException e) {
+            // Ok
+        } catch (Exception e) {
+            LOG.warn("Failed to create failure-domain znode {} ", path, e);
+        }
     }
 
     private void initZK() throws PulsarServerException {
@@ -138,6 +180,10 @@ public class ConfigurationCacheService {
         return this.clustersListCache;
     }
 
+    public ZooKeeperChildrenCache failureDomainListCache() {
+        return this.failureDomainListCache;
+    }
+    
     public ZooKeeper getZooKeeper() {
         return this.cache.getZooKeeper();
     }
@@ -145,4 +191,8 @@ public class ConfigurationCacheService {
     public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
         return this.namespaceIsolationPoliciesCache;
     }
+    
+    public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+        return this.failureDomainCache;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2d7f99b..6cd56ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -417,7 +417,7 @@ public class PulsarService implements AutoCloseable {
             throw new PulsarServerException(e);
         }
 
-        this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
+        this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache(), this.config.getClusterName());
         this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f542a10..f04d15b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.admin;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -45,6 +47,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -64,7 +67,6 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
@@ -202,6 +204,7 @@ public abstract class AdminResource extends PulsarWebResource {
         return namespaces;
     }
 
+    
     /**
      * Redirect the call to the specified broker
      *
@@ -286,6 +289,14 @@ public abstract class AdminResource extends PulsarWebResource {
         return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
     }
 
+    protected ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+        return pulsar().getConfigurationCache().failureDomainCache();
+    }
+
+    protected ZooKeeperChildrenCache failureDomainListCache() {
+        return pulsar().getConfigurationCache().failureDomainListCache();
+    }
+
     protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
             String destination, boolean authoritative) {
         DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
@@ -354,5 +365,14 @@ public abstract class AdminResource extends PulsarWebResource {
         }
         return metadataFuture;
     }
-    
+
+    protected void validateClusterExists(String cluster) {
+        try {
+            if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+                throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
+            }
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
index b489f5f..612e2e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
@@ -23,9 +23,11 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -38,14 +40,17 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -53,12 +58,15 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Maps;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.FAILURE_DOMAIN;
+
 @Path("/clusters")
 @Api(value = "/clusters", description = "Cluster admin apis", tags = "clusters")
 @Produces(MediaType.APPLICATION_JSON)
@@ -138,9 +146,14 @@ public class Clusters extends AdminResource {
             String clusterPath = path("clusters", cluster);
             Stat nodeStat = new Stat();
             byte[] content = globalZk().getData(clusterPath, null, nodeStat);
-            ClusterData currentClusterData = jsonMapper().readValue(content, ClusterData.class);
-            // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
-            currentClusterData.update(clusterData);
+            ClusterData currentClusterData = null;
+            if (content.length > 0) {
+                currentClusterData = jsonMapper().readValue(content, ClusterData.class);
+                // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
+                currentClusterData.update(clusterData);
+            } else {
+                currentClusterData = clusterData;
+            }
             // Write back the new updated ClusterData into zookeeper
             globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData),
                     nodeStat.getVersion());
@@ -259,6 +272,7 @@ public class Clusters extends AdminResource {
 
         try {
             String clusterPath = path("clusters", cluster);
+            deleteFailureDomain(clusterPath);
             globalZk().delete(clusterPath, -1);
             globalZkCache().invalidate(clusterPath);
             log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
@@ -271,6 +285,25 @@ public class Clusters extends AdminResource {
         }
     }
 
+    private void deleteFailureDomain(String clusterPath) {
+        try {
+            String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
+            if (globalZk().exists(failureDomain, false) == null) {
+                return;
+            }
+            for (String domain : globalZk().getChildren(failureDomain, false)) {
+                String domainPath = joinPath(failureDomain, domain);
+                globalZk().delete(domainPath, -1);
+            }
+            globalZk().delete(failureDomain, -1);
+            failureDomainCache().clear();
+            failureDomainListCache().clear();
+        } catch (Exception e) {
+            log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
+            throw new RestException(e);
+        }
+    }
+
     @GET
     @Path("/{cluster}/namespaceIsolationPolicies")
     @ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map")
@@ -296,16 +329,6 @@ public class Clusters extends AdminResource {
         }
     }
 
-    private void validateClusterExists(String cluster) {
-        try {
-            if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
-                throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
-            }
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-    }
-
     @GET
     @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
     @ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class)
@@ -356,7 +379,7 @@ public class Clusters extends AdminResource {
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+                            this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
                             return new NamespaceIsolationPolicies();
                         } catch (KeeperException | InterruptedException e) {
                             throw new RestException(e);
@@ -386,23 +409,26 @@ public class Clusters extends AdminResource {
         }
     }
 
-    private void createNamespaceIsolationPolicyNode(String nsIsolationPolicyPath)
-            throws KeeperException, InterruptedException {
+    private boolean createZnodeIfNotExist(String path, Optional<Object> value) throws KeeperException, InterruptedException {
         // create persistent node on ZooKeeper
-        if (globalZk().exists(nsIsolationPolicyPath, false) == null) {
+        if (globalZk().exists(path, false) == null) {
             // create all the intermediate nodes
             try {
-                ZkUtils.createFullPathOptimistic(globalZk(), nsIsolationPolicyPath,
-                        jsonMapper().writeValueAsBytes(Collections.emptyMap()), Ids.OPEN_ACL_UNSAFE,
+                ZkUtils.createFullPathOptimistic(globalZk(), path,
+                        value.isPresent() ? jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
+                return true;
             } catch (KeeperException.NodeExistsException nee) {
-                log.debug("Other broker preempted the full path [{}] already. Continue...", nsIsolationPolicyPath);
+                if(log.isDebugEnabled()) {
+                    log.debug("Other broker preempted the full path [{}] already. Continue...", path);                    
+                }
             } catch (JsonGenerationException e) {
                 // ignore json error as it is empty hash
             } catch (JsonMappingException e) {
             } catch (IOException e) {
             }
         }
+        return false;
     }
 
     @DELETE
@@ -422,7 +448,7 @@ public class Clusters extends AdminResource {
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+                            this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
                             return new NamespaceIsolationPolicies();
                         } catch (KeeperException | InterruptedException e) {
                             throw new RestException(e);
@@ -446,6 +472,157 @@ public class Clusters extends AdminResource {
         }
     }
 
+    @POST
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Set cluster's failure Domain")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 409, message = "Broker already exist into other domain"),
+            @ApiResponse(code = 404, message = "Cluster doesn't exist") })
+    public void setFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName,
+            FailureDomain domain) throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+        validateBrokerExistsInOtherDomain(cluster, domainName, domain);
+
+        try {
+            String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+            if (this.createZnodeIfNotExist(domainPath, Optional.ofNullable(domain))) {
+                // clear domains-children cache
+                this.failureDomainListCache().clear();
+            } else {
+                globalZk().setData(domainPath, jsonMapper().writeValueAsBytes(domain), -1);
+                // make sure that the domain-cache will be refreshed for the next read access
+                failureDomainCache().invalidate(domainPath);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
+                    domainName);
+            throw new RestException(Status.NOT_FOUND,
+                    "Domain " + domainName + " for cluster " + cluster + " does not exist");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
+            throw new RestException(e);
+        }
+    }
+
+    @GET
+    @Path("/{cluster}/failureDomains")
+    @ApiOperation(value = "Get the cluster failure domains", response = FailureDomain.class, responseContainer = "Map")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Map<String, FailureDomain> getFailureDomains(@PathParam("cluster") String cluster) throws Exception {
+        validateSuperUserAccess();
+
+        Map<String, FailureDomain> domains = Maps.newHashMap();
+        try {
+            final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+            for (String domainName : failureDomainListCache().get()) {
+                try {
+                    Optional<FailureDomain> domain = failureDomainCache()
+                            .get(joinPath(failureDomainRootPath, domainName));
+                    if (domain.isPresent()) {
+                        domains.put(domainName, domain.get());
+                    }
+                } catch (Exception e) {
+                    log.warn("Failed to get domain {}", domainName, e);
+                }
+            }
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
+            return Collections.emptyMap();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        return domains;
+    }
+
+    @GET
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Get a domain in a cluster", response = FailureDomain.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Domain doesn't exist"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public FailureDomain getDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+            throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        try {
+            final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+            return failureDomainCache().get(joinPath(failureDomainRootPath, domainName))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                            "Domain " + domainName + " for cluster " + cluster + " does not exist"));
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
+            throw new RestException(e);
+        }
+    }
+
+    @DELETE
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Delete cluster's failure omain")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission or plicy is read only"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public void deleteFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+            throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        try {
+            final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+            globalZk().delete(domainPath, -1);
+            // clear domain cache
+            failureDomainCache().invalidate(domainPath);
+            failureDomainListCache().clear();
+        } catch (KeeperException.NoNodeException nne) {
+            log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
+            throw new RestException(Status.NOT_FOUND,
+                    "Domain-name " + domainName + " or cluster " + cluster + " does not exist");
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
+            throw new RestException(e);
+        }
+    }
+
+    private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
+            final FailureDomain inputDomain) {
+        if (inputDomain != null && inputDomain.brokers != null) {
+            try {
+                final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+                for (String domainName : failureDomainListCache().get()) {
+                    if (inputDomainName.equals(domainName)) {
+                        continue;
+                    }
+                    try {
+                        Optional<FailureDomain> domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName));
+                        if (domain.isPresent() && domain.get().brokers != null) {
+                            List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
+                                    .filter(inputDomain.brokers::contains).collect(Collectors.toList());
+                            if (!duplicateBrokers.isEmpty()) {
+                                throw new RestException(Status.CONFLICT,
+                                        duplicateBrokers + " already exist into " + domainName);
+                            }
+                        }
+                    } catch (Exception e) {
+                        if (e instanceof RestException) {
+                            throw e;
+                        }
+                        log.warn("Failed to get domain {}", domainName, e);
+                    }
+                }
+            } catch (KeeperException.NoNodeException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
+                }
+            } catch (Exception e) {
+                log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
+                throw new RestException(e);
+            }
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Clusters.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index cb8cf72..cb8f982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 
 import java.net.URI;
@@ -91,8 +94,6 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
 @Path("/namespaces")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -669,6 +670,139 @@ public class Namespaces extends AdminResource {
     }
 
     @POST
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Set anti-affinity group for a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") })
+    public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, String antiAffinityGroup) {
+        validateAdminAccessOnProperty(property);
+        validatePoliciesReadOnlyAccess();
+        
+        log.info("[{}] Setting anti-affinity group {} for {}/{}/{}", clientAppId(), antiAffinityGroup, property,
+                cluster, namespace);
+
+        if (isBlank(antiAffinityGroup)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
+        }
+
+        NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
+        Entry<Policies, Stat> policiesNode = null;
+
+        try {
+            // Force to read the data s.t. the watch to the cache content is setup.
+            policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
+            policiesNode.getKey().antiAffinityGroup = antiAffinityGroup;
+
+            // Write back the new policies into zookeeper
+            globalZk().setData(path(POLICIES, property, cluster, namespace),
+                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+
+            log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}/{}/{}", clientAppId(),
+                    antiAffinityGroup, property, cluster, namespace);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}/{}/{}: does not exist", clientAppId(),
+                    property, cluster, namespace);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn(
+                    "[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{} expected policy node version={} : concurrent modification",
+                    clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());
+
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{}", clientAppId(), property, cluster,
+                    namespace, e);
+            throw new RestException(e);
+        }
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Get anti-affinity group of a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
+    public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateAdminAccessOnProperty(property);
+        return getNamespacePolicies(property, cluster, namespace).antiAffinityGroup;
+    }
+
+    @GET
+    @Path("{cluster}/antiAffinity/{group}")
+    @ApiOperation(value = "Get all namespaces that are grouped by given anti-affinity group in a given cluster. api can be only accessed by admin of any of the existing property")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.") })
+    public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String cluster,
+            @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) {
+        validateAdminAccessOnProperty(property);
+
+        log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), property, antiAffinityGroup, cluster);
+
+        if (isBlank(antiAffinityGroup)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty.");
+        }
+        validateClusterExists(cluster);
+        List<String> namespaces = Lists.newArrayList();
+        try {
+            for (String prop : globalZk().getChildren(POLICIES_ROOT, false)) {
+                for (String namespace : globalZk().getChildren(path(POLICIES, prop, cluster), false)) {
+                    Optional<Policies> policies = policiesCache()
+                            .get(AdminResource.path(POLICIES, prop, cluster, namespace));
+                    if (policies.isPresent() && antiAffinityGroup.equalsIgnoreCase(policies.get().antiAffinityGroup)) {
+                        namespaces.add(String.format("%s/%s/%s", prop, cluster, namespace));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to list of properties/namespace from global-zk", e);
+        }
+        return namespaces;
+    }
+    
+    @DELETE
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Remove anti-affinity group of a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+        validateAdminAccessOnProperty(property);
+        validatePoliciesReadOnlyAccess();
+
+        log.info("[{}] Deleting anti-affinity group for {}/{}/{}", clientAppId(), property, cluster, namespace);
+        
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, property, cluster, namespace);
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies.antiAffinityGroup = null;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+            log.info("[{}] Successfully removed anti-affinity group for a namespace={}/{}/{}", clientAppId(), property,
+                    cluster, namespace);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: does not exist", clientAppId(),
+                    property, cluster, namespace);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: concurrent modification",
+                    clientAppId(), property, cluster, namespace);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}", clientAppId(), property,
+                    cluster, namespace, e);
+            throw new RestException(e);
+        }
+    }
+
+    @POST
     @Path("/{property}/{cluster}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 1a2d636..ffdaac6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -19,29 +19,44 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.broker.BrokerData;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * This class contains code which in shared between the two load manager implementations.
  */
@@ -60,13 +75,15 @@ public class LoadManagerShared {
     // update LoadReport at most every 5 seconds
     public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
 
+    private static final String DEFAULT_DOMAIN = "default";
+
     // Don't allow construction: static method namespace only.
     private LoadManagerShared() {
     }
 
     // Determines the brokers available for the given service unit according to the given policies.
     // The brokers are put into brokerCandidateCache.
-    public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
+    public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
             final Set<String> availableBrokers,
             final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
@@ -264,6 +281,227 @@ public class LoadManagerShared {
         }
     }
     
+    /**
+     * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
+     * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
+     * number of namespaces. It also tries to keep brokers which has least number of namespace with in domain.
+     * eg.
+     * <pre>
+     * Before:
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b1-2,b2-1
+     * d2-3          b3-2,b4-1
+     * d3-4          b5-2,b6-2
+     * 
+     * After filtering: "candidates" brokers
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b2-1
+     * d2-3          b4-1
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * </pre>
+     * 
+     * @param pulsar
+     * @param assignedBundleName
+     * @param candidates
+     * @param brokerToNamespaceToBundleRange
+     */
+    public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
+            final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            Map<String, String> brokerToDomainMap) {
+        if (candidates.isEmpty()) {
+            return;
+        }
+        final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
+        try {
+            final Map<String, Integer> brokerToAntiAffinityNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar,
+                    namespaceName, brokerToNamespaceToBundleRange).get(30, TimeUnit.SECONDS);
+            if (brokerToAntiAffinityNamespaceCount == null) {
+                // none of the broker owns anti-affinity-namespace so, none of the broker will be filtered
+                return;
+            }
+            if (pulsar.getConfiguration().isFailureDomainsEnabled()) {
+                // this will remove all the brokers which are part of domains that don't have least number of
+                // anti-affinity-namespaces
+                filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates,
+                        brokerToDomainMap);
+            }
+            // now, "candidates" has list of brokers which are part of domain that can accept this namespace. now,
+            // with in these domains, remove brokers which don't have least number of namespaces. so, brokers with least
+            // number of namespace can be selected
+            int leastNamaespaceCount = Integer.MAX_VALUE;
+            for (final String broker : candidates) {
+                if (brokerToAntiAffinityNamespaceCount.containsKey(broker)) {
+                    Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker);
+                    if (namespaceCount == null) {
+                        // Assume that when the namespace is absent, there are no namespace assigned to
+                        // that broker.
+                        leastNamaespaceCount = 0;
+                        break;
+                    }
+                    leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount);
+                } else {
+                    // Assume non-present brokers have 0 bundles.
+                    leastNamaespaceCount = 0;
+                    break;
+                }
+            }
+            // filter out broker based on namespace distribution
+            if (leastNamaespaceCount == 0) {
+                candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker)
+                        && brokerToAntiAffinityNamespaceCount.get(broker) > 0);
+            } else {
+                final int finalLeastNamespaceCount = leastNamaespaceCount;
+                candidates
+                        .removeIf(broker -> brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount);
+            }
+        } catch (Exception e) {
+            log.error("Failed to filter anti-affinity group namespace {}", e.getMessage());
+        }
+    }
+
+    /**
+     * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
+     * namespaces more than this count.
+     * 
+     * @param brokerToAntiAffinityNamespaceCount
+     * @param candidates
+     * @param brokerToDomainMap
+     */
+    private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
+            Map<String, Integer> brokerToAntiAffinityNamespaceCount, Set<String> candidates,
+            Map<String, String> brokerToDomainMap) {
+
+        if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) {
+            return;
+        }
+
+        final Map<String, Integer> domainNamespaceCount = Maps.newHashMap();
+        int leastNamespaceCount = Integer.MAX_VALUE;
+        candidates.forEach(broker -> {
+            final String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN);
+            final int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0);
+            domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count);
+        });
+        // find leastNameSpaceCount
+        for (Entry<String, Integer> domainNsCountEntry : domainNamespaceCount.entrySet()) {
+            if (domainNsCountEntry.getValue() < leastNamespaceCount) {
+                leastNamespaceCount = domainNsCountEntry.getValue();
+            }
+        }
+        final int finalLeastNamespaceCount = leastNamespaceCount;
+        // only keep domain brokers which has leastNamespaceCount
+        candidates.removeIf(broker -> {
+            Integer nsCount = domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN));
+            return nsCount != null && nsCount != finalLeastNamespaceCount;
+        });
+    }
+
+    /**
+     * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
+     * {@param namespaceName}
+     * 
+     * @param pulsar
+     * @param namespaceName
+     * @param brokerToNamespaceToBundleRange
+     * @return
+     */
+    public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
+            final PulsarService pulsar, String namespaceName,
+            Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+
+        CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
+        ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
+
+        policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
+            if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
+                antiAffinityNsBrokersResult.complete(null);
+                return;
+            }
+            final String antiAffinityGroup = policies.get().antiAffinityGroup;
+            final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
+            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
+                nsToBundleRange.forEach((ns, bundleRange) -> {
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    futures.add(future);
+                    policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
+                        if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
+                            brokerToAntiAffinityNamespaceCount.compute(broker,
+                                    (brokerName, count) -> count == null ? 1 : count + 1);
+                        }
+                        future.complete(null);
+                    }).exceptionally(ex -> {
+                        future.complete(null);
+                        return null;
+                    });
+                });
+            });
+            FutureUtil.waitForAll(futures)
+                    .thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
+        }).exceptionally(ex -> {
+            // namespace-policies has not been created yet
+            antiAffinityNsBrokersResult.complete(null);
+            return null;
+        });
+        return antiAffinityNsBrokersResult;
+    }
+
+    /**
+     * 
+     * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
+     * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
+     * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
+     * by different broker.
+     * 
+     * @param namespace
+     * @param bundle
+     * @param currentBroker
+     * @param pulsar
+     * @param brokerToNamespaceToBundleRange
+     * @param candidateBroekrs
+     * @return
+     * @throws Exception
+     */
+    public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
+            final PulsarService pulsar, Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            Set<String> candidateBroekrs) throws Exception {
+
+        Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
+                brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS);
+        if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
+            int leastNsCount = Integer.MAX_VALUE;
+            int currentBrokerNsCount = 0;
+
+            for (String broker : candidateBroekrs) {
+                int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
+                if (currentBroker.equals(broker)) {
+                    currentBrokerNsCount = nsCount;
+                }
+                if (leastNsCount > nsCount) {
+                    leastNsCount = nsCount;
+                }
+            }
+            // check if there is any other broker has less number of ns
+            if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
+                return true;
+            }
+            // check if all the brokers having same number of ns-count then broker can't unload
+            int leastNsOwnerBrokers = 0;
+            for (String broker : candidateBroekrs) {
+                if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) {
+                    leastNsOwnerBrokers++;
+                }
+            }
+            // if all candidate brokers own same-number of ns then broker can't unload
+            return candidateBroekrs.size() != leastNsOwnerBrokers;
+        }
+        return true;
+    }
+
     public interface BrokerTopicLoadingPredicate {
         boolean isEnablePersistentTopics(String brokerUrl);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9d5cf30..b8519d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -57,6 +60,8 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -75,6 +80,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
@@ -170,7 +177,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
     
     // check if given broker can load persistent/non-persistent topic
     private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-
+    
+    private Map<String, String> brokerToFailureDomainMap;
 
     private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
             .readValue(content, LocalBrokerData.class);
@@ -188,6 +196,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         loadSheddingPipeline.add(new OverloadShedder(conf));
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
+        this.brokerToFailureDomainMap = Maps.newHashMap();
         
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
@@ -214,6 +223,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
      *            The service to initialize with.
      */
     public void initialize(final PulsarService pulsar) {
+        this.pulsar = pulsar;
         availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(),
                 LoadManager.LOADBALANCE_BROKERS_ROOT);
         availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
@@ -266,9 +276,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         
         placementStrategy = ModularLoadManagerStrategy.create(conf);
         policies = new SimpleResourceAllocationPolicies(pulsar);
-        this.pulsar = pulsar;
         zkClient = pulsar.getZkClient();
         filterPipeline.add(new BrokerVersionFilter());
+        
+        refreshBrokerToFailureDomainMap();
+        // register listeners for domain changes
+        pulsar.getConfigurationCache().failureDomainListCache()
+                .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
+        pulsar.getConfigurationCache().failureDomainCache()
+                .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
     }
 
     /**
@@ -421,7 +437,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
     }
 
     // Update both the broker data and the bundle data.
-    private void updateAll() {
+    public void updateAll() {
         if (log.isDebugEnabled()) {
             log.debug("Updating broker and bundle data for loadreport");
         }
@@ -575,10 +591,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                     for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
                         final String broker = entry.getKey();
                         final String bundle = entry.getValue();
-                        log.info("Unloading bundle: {}", bundle);
-                        pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
-                                LoadManagerShared.getNamespaceNameFromBundleName(bundle),
-                                LoadManagerShared.getBundleRangeFromBundleName(bundle));
+                        
+                        final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                        final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                        if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
+                            continue;
+                        }
+                        log.info("Unloading bundle: {} from broker {}", bundle, broker);
+                        pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
                         loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
                     }
                 } catch (Exception e) {
@@ -589,6 +609,32 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         }
     }
 
+    public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
+        try {
+            Optional<Policies> nsPolicies = pulsar.getConfigurationCache().policiesCache()
+                    .get(path(POLICIES, namespace));
+            if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().antiAffinityGroup)) {
+                return true;
+            }
+
+            synchronized (brokerCandidateCache) {
+                brokerCandidateCache.clear();
+                ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
+                        .getBundle(namespace, bundle);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
+                        getAvailableBrokers(), brokerTopicLoadingPredicate);
+                return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
+                        brokerToNamespaceToBundleRange, brokerCandidateCache);
+            }
+
+        } catch (Exception e) {
+            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", namespace, bundle,
+                    currentBroker, e.getMessage());
+
+        }
+        return true;
+    }
+
     /**
      * As the leader broker, attempt to automatically detect and split hot namespace bundles.
      */
@@ -657,11 +703,18 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
                     key -> getBundleDataOrDefault(bundle));
             brokerCandidateCache.clear();
-            LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                     brokerTopicLoadingPredicate);
+
             // filter brokers which owns topic higher than threshold
             LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
                     conf.getLoadBalancerBrokerMaxTopics());
+
+            // distribute namespaces to domain and brokers according to anti-affinity-group
+            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache,
+                    brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
+            // distribute bundles evenly to candidate-brokers
+
             LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
                     brokerToNamespaceToBundleRange);
             log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
@@ -673,13 +726,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
                 }
             } catch ( BrokerFilterException x ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
             }
 
             if ( brokerCandidateCache.isEmpty() ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
             }
 
@@ -693,7 +746,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
             final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
             if (maxUsage > overloadThreshold) {
                 // All brokers that were in the filtered list were overloaded, so check if there is a better broker
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
                 broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
             }
@@ -871,4 +924,33 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
         }
         return 0;
     }
+
+    private void refreshBrokerToFailureDomainMap() {
+        if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
+            return;
+        }
+        final String clusterDomainRootPath = pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+        try {
+            synchronized (brokerToFailureDomainMap) {
+                Map<String, String> tempBrokerToFailureDomainMap = Maps.newHashMap();
+                for (String domainName : pulsar.getConfigurationCache().failureDomainListCache().get()) {
+                    try {
+                        Optional<FailureDomain> domain = pulsar.getConfigurationCache().failureDomainCache()
+                                .get(clusterDomainRootPath + "/" + domainName);
+                        if (domain.isPresent()) {
+                            for (String broker : domain.get().brokers) {
+                                tempBrokerToFailureDomainMap.put(broker, domainName);
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to get domain {}", domainName, e);
+                    }
+                }
+                this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap;
+            }
+            log.info("Cluster domain refreshed {}", brokerToFailureDomainMap);
+        } catch (Exception e) {
+            log.warn("Failed to get domain-list for cluster {}", e.getMessage());
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 338f302..bdf0345 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -913,7 +913,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
             }
             brokerCandidateCache.clear();
             try {
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
                         brokerTopicLoadingPredicate);
             } catch (Exception e) {
                 log.warn("Error when trying to apply policies: {}", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c22758d..40b7d03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -168,25 +168,26 @@ public abstract class PulsarWebResource {
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
-            log.error("Failed to get property admin data for property");
+            log.error("Failed to get property admin data for property {}", property);
             throw new RestException(e);
         }
     }
 
     protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
-        PropertyAdmin propertyAdmin;
-
-        try {
-            propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("Failed to get property admin data for non existing property {}", property);
-            throw new RestException(Status.NOT_FOUND, "Property does not exist");
-        }
 
         if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
             log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
                     (isClientAuthenticated(clientAppId)), clientAppId);
+            
+            PropertyAdmin propertyAdmin;
+
+            try {
+                propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
+                        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
+            } catch (KeeperException.NoNodeException e) {
+                log.warn("Failed to get property admin data for non existing property {}", property);
+                throw new RestException(Status.NOT_FOUND, "Property does not exist");
+            }
 
             if (!isClientAuthenticated(clientAppId)) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index e0d55a6..73071ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -115,7 +115,7 @@ public class SLAMonitoringTest {
             throws PulsarClientException, MalformedURLException, PulsarAdminException {
         ClusterData clusterData = new ClusterData();
         clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString());
-        pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
+        pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData);
         Set<String> allowedClusters = new HashSet<>();
         allowedClusters.add("my-cluster");
         PropertyAdmin adminConfig = new PropertyAdmin();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 795d807..444d224 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -172,21 +172,23 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
     public void clusters() throws Exception {
         admin.clusters().createCluster("usw",
                 new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+        // failure-domain znode of this default cluster
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
 
         assertEquals(admin.clusters().getCluster("use"),
                 new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
 
         admin.clusters().updateCluster("usw",
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
         assertEquals(admin.clusters().getCluster("usw"),
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
 
         admin.clusters().updateCluster("usw",
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
                         "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
         assertEquals(admin.clusters().getCluster("usw"),
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
                         "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
@@ -194,11 +196,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         admin.clusters().deleteCluster("usw");
         Thread.sleep(300);
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use"));
 
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
 
         // Check name validation
         try {
@@ -382,7 +384,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+        // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+        // failure-domain znode of this default cluster
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 2362f44..5825783 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -25,7 +26,9 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.net.URL;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -54,6 +57,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -582,7 +586,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
             assertTrue(e instanceof PreconditionFailedException);
         }
     }
-    
+
     /**
      * It validates that peer-cluster can't coexist in replication-cluster list
      * 
@@ -640,4 +644,69 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
     }
 
+    @Test
+    public void clusterFailureDomain() throws PulsarAdminException {
+
+        final String cluster = pulsar.getConfiguration().getClusterName();
+        admin.clusters().updateCluster(cluster,
+                new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
+        // create
+        FailureDomain domain = new FailureDomain();
+        domain.setBrokers(Sets.newHashSet("b1", "b2", "b3"));
+        admin.clusters().createFailureDomain(cluster, "domain-1", domain);
+        admin.clusters().updateFailureDomain(cluster, "domain-1", domain);
+
+        assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain);
+
+        Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster);
+        assertEquals(domains.size(), 1);
+        assertTrue(domains.containsKey("domain-1"));
+
+        try {
+            // try to create domain with already registered brokers
+            admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+            fail("should have failed because of brokers are already registered");
+        } catch (PulsarAdminException.ConflictException e) {
+            // Ok
+        }
+
+        admin.clusters().deleteFailureDomain(cluster, "domain-1");
+        assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty());
+
+        admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+        domains = admin.clusters().getFailureDomains(cluster);
+        assertEquals(domains.size(), 1);
+        assertTrue(domains.containsKey("domain-2"));
+    }
+
+    @Test
+    public void namespaceAntiAffinity() throws PulsarAdminException {
+        final String namespace = "prop-xyz/use/ns1";
+        final String antiAffinityGroup = "group";
+        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+        admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+        assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup);
+        admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+
+        final String ns1 = "prop-xyz/use/antiAG1";
+        final String ns2 = "prop-xyz/use/antiAG2";
+        final String ns3 = "prop-xyz/use/antiAG3";
+        admin.namespaces().createNamespace(ns1);
+        admin.namespaces().createNamespace(ns2);
+        admin.namespaces().createNamespace(ns3);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup);
+
+        Set<String> namespaces = new HashSet<>(
+                admin.namespaces().getAntiAffinityNamespaces("dummy", "use", antiAffinityGroup));
+        assertEquals(namespaces.size(), 3);
+        assertTrue(namespaces.contains(ns1));
+        assertTrue(namespaces.contains(ns2));
+        assertTrue(namespaces.contains(ns3));
+
+        List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
+        assertEquals(namespaces2.size(), 0);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0245b67..c587033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -91,10 +91,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
     private Field uriField;
     private UriInfo uriInfo;
+    private final String configClusterName = "use";
 
     public AdminTest() {
         super();
-        conf.setClusterName("use");
+        conf.setClusterName(configClusterName);
     }
 
     @Override
@@ -186,10 +187,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
     @Test
     void clusters() throws Exception {
-        assertEquals(clusters.getClusters(), new ArrayList<String>());
+        assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName));
         verify(clusters, never()).validateSuperUserAccess();
 
-        clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
+        clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
         verify(clusters, times(1)).validateSuperUserAccess();
         // ensure to read from ZooKeeper directly
         clusters.clustersListCache().clear();
@@ -447,7 +448,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(properties.getProperties(), Lists.newArrayList());
 
         // Create a namespace to test deleting a non-empty property
-        clusters.createCluster("use", new ClusterData());
+        clusters.updateCluster("use", new ClusterData());
         newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use"));
         properties.createProperty("my-property", newPropertyAdmin);
 
@@ -474,7 +475,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
     @Test
     void brokers() throws Exception {
-        clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com",
+        clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com",
                 "https://broker.messaging.use.example.com:4443"));
 
         URI requestUri = new URI(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 37ec7eb..36c9ec2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -135,7 +135,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doNothing().when(namespaces).validateAdminAccessOnProperty("other-property");
         doNothing().when(namespaces).validateAdminAccessOnProperty("new-property");
 
-        admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
+        admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
         admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT));
         admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT));
         admin.properties().createProperty(this.testProperty,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 0f79e75..cee4bbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -64,7 +64,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
 
-        admin.clusters().createCluster("c1", new ClusterData());
+        admin.clusters().updateCluster("c1", new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 67873bc..57ed5e5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -77,6 +77,7 @@ public abstract class MockedPulsarServiceBaseTest {
     protected MockZooKeeper mockZookKeeper;
     protected NonClosableMockBookKeeper mockBookKeeper;
     protected boolean isTcpLookup = false;
+    protected final String configClusterName = "test";
 
     private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
 
@@ -90,13 +91,12 @@ public abstract class MockedPulsarServiceBaseTest {
         this.conf.setBrokerServicePortTls(BROKER_PORT_TLS);
         this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
         this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
-        this.conf.setClusterName("test");
+        this.conf.setClusterName(configClusterName);
         this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
         this.conf.setManagedLedgerCacheSizeMB(8);
         this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
         this.conf.setDefaultNumberOfNamespaceBundles(1);
         this.conf.setZookeeperServers("localhost:2181");
-        this.conf.setClusterName("mock");
     }
 
     protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
new file mode 100644
index 0000000..ec0bf1f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.beust.jcommander.internal.Maps;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+import junit.framework.Assert;
+
+public class AntiAffinityNamespaceGroupTest {
+    private LocalBookkeeperEnsemble bkEnsemble;
+
+    private URL url1;
+    private PulsarService pulsar1;
+    private PulsarAdmin admin1;
+
+    private URL url2;
+    private PulsarService pulsar2;
+    private PulsarAdmin admin2;
+
+    private String primaryHost;
+    private String secondaryHost;
+
+    private NamespaceBundleFactory nsFactory;
+
+    private ModularLoadManagerImpl primaryLoadManager;
+    private ModularLoadManagerImpl secondaryLoadManager;
+
+    private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+    private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
+    private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort();
+    private static final Logger log = LoggerFactory.getLogger(AntiAffinityNamespaceGroupTest.class);
+
+    static {
+        System.setProperty("test.basePort", "16100");
+    }
+
+    private static Object getField(final Object instance, final String fieldName) throws Exception {
+        final Field field = instance.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(instance);
+    }
+
+    @BeforeMethod
+    void setup() throws Exception {
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        // Start broker 1
+        ServiceConfiguration config1 = new ServiceConfiguration();
+        config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config1.setClusterName("use");
+        config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
+        config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
+        config1.setFailureDomainsEnabled(true);
+        config1.setLoadBalancerEnabled(true);
+        createCluster(bkEnsemble.getZkClient(), config1);
+        pulsar1 = new PulsarService(config1);
+
+        pulsar1.start();
+
+        primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
+        url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
+        admin1 = new PulsarAdmin(url1, (Authentication) null);
+
+        // Start broker 2
+        ServiceConfiguration config2 = new ServiceConfiguration();
+        config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config2.setClusterName("use");
+        config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
+        config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
+        config2.setFailureDomainsEnabled(true);
+        pulsar2 = new PulsarService(config2);
+        secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
+                SECONDARY_BROKER_WEBSERVICE_PORT);
+
+        pulsar2.start();
+
+        url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
+        admin2 = new PulsarAdmin(url2, (Authentication) null);
+
+        primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
+        secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
+        nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        executor.shutdown();
+
+        admin1.close();
+        admin2.close();
+
+        pulsar2.close();
+        pulsar1.close();
+
+        bkEnsemble.stop();
+    }
+
+    private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception {
+        ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(),
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+                        new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort())),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @Test
+    public void testClusterDomain() {
+
+    }
+
+    /**
+     * 
+     * It verifies anti-affinity-namespace assignment with failure-domain
+     * 
+     * <pre>
+     * Domain     Brokers-count
+     * ________  ____________
+     * domain-0   broker-0,broker-1
+     * domain-1   broker-2,broker-3
+     * 
+     * Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * (2) ns1 -> candidate-brokers: b2, b3         => selected b2
+     * (3) ns2 -> candidate-brokers: b1, b3         => selected b1
+     * (4) ns3 -> candidate-brokers: b3             => selected b3
+     * (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * </pre>
+     * 
+     */
+    @Test
+    public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+        final int totalBrokers = 4;
+
+        pulsar1.getConfiguration().setFailureDomainsEnabled(true);
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Map<String, String> brokerToDomainMap = Maps.newHashMap();
+        brokers.add("brokerName-0");
+        brokerToDomainMap.put("brokerName-0", "domain-0");
+        brokers.add("brokerName-1");
+        brokerToDomainMap.put("brokerName-1", "domain-0");
+        brokers.add("brokerName-2");
+        brokerToDomainMap.put("brokerName-2", "domain-1");
+        brokers.add("brokerName-3");
+        brokerToDomainMap.put("brokerName-3", "domain-1");
+
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+
+        Assert.assertEquals(brokers.size(), totalBrokers);
+
+        String assignedNamespace = namespace + "0" + bundle;
+        candidate.addAll(brokers);
+
+        // for namespace-0 all brokers available
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(brokers.size(), totalBrokers);
+
+        // add namespace-0 to broker-0 of domain-0 => state: n0->b0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-0", namespace + "0", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-1 only domain-1 brokers are available as broker-0 already owns namespace-0
+        assignedNamespace = namespace + "1" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 2);
+        candidate.forEach(broker -> Assert.assertEquals(brokerToDomainMap.get(broker), "domain-1"));
+
+        // add namespace-1 to broker-2 of domain-1 => state: n0->b0, n1->b2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-2", namespace + "1", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-2 only brokers available are : broker-1 and broker-3
+        assignedNamespace = namespace + "2" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 2);
+        Assert.assertTrue(candidate.contains("brokerName-1"));
+        Assert.assertTrue(candidate.contains("brokerName-3"));
+
+        // add namespace-2 to broker-1 of domain-0 => state: n0->b0, n1->b2, n2->b1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-1", namespace + "2", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-3 only brokers available are : broker-3
+        assignedNamespace = namespace + "3" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 1);
+        Assert.assertTrue(candidate.contains("brokerName-3"));
+        // add namespace-3 to broker-3 of domain-1 => state: n0->b0, n1->b2, n2->b1, n3->b3
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-3", namespace + "3", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-4 only brokers available are : all 4 brokers
+        assignedNamespace = namespace + "4" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 4);
+    }
+
+    /**
+     * It verifies anti-affinity-namespace assignment without failure-domain enabled
+     * 
+     * <pre>
+     *  Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2     => selected b0
+     * (2) ns1 -> candidate-brokers: b1, b2         => selected b1
+     * (3) ns2 -> candidate-brokers: b2             => selected b2
+     * (5) ns3 -> candidate-brokers: b0, b1, b2     => selected b0
+     * </pre>
+     * 
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        brokers.add("broker-0");
+        brokers.add("broker-1");
+        brokers.add("broker-2");
+
+        String assignedNamespace = namespace + "0" + bundle;
+
+        // all brokers available so, candidate will be all 3 brokers
+        candidate.addAll(brokers);
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(brokers.size(), 3);
+
+        // add ns-0 to broker-0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+        candidate.addAll(brokers);
+        assignedNamespace = namespace + "1" + bundle;
+        // available brokers for ns-1 => broker-1, broker-2
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 2);
+        Assert.assertTrue(candidate.contains("broker-1"));
+        Assert.assertTrue(candidate.contains("broker-2"));
+
+        // add ns-1 to broker-1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+        candidate.addAll(brokers);
+        // available brokers for ns-2 => broker-2
+        assignedNamespace = namespace + "2" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 1);
+        Assert.assertTrue(candidate.contains("broker-2"));
+
+        // add ns-2 to broker-2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+        candidate.addAll(brokers);
+        // available brokers for ns-3 => broker-0, broker-1, broker-2
+        assignedNamespace = namespace + "3" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 3);
+    }
+
+    private void selectBrokerForNamespace(Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            String broker, String namespace, String assignedBundleName) {
+        Map<String, Set<String>> nsToBundleMap = Maps.newHashMap();
+        nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName));
+        brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
+    }
+
+    /**
+     * It verifies anti-affinity with failure domain enabled with 2 brokers.
+     * 
+     * <pre>
+     * 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
+     * 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
+     * 3. Create two namespace with anti-affinity
+     * 4. Load-manager selects broker for each namespace such that from different domains
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
+
+        final String broker1 = primaryHost;
+        final String broker2 = secondaryHost;
+        final String cluster = pulsar1.getConfiguration().getClusterName();
+        final String property = "prop";
+        final String namespace1 = property + "/" + cluster + "/ns1";
+        final String namespace2 = property + "/" + cluster + "/ns2";
+        final String namespaceAntiAffinityGroup = "group";
+        FailureDomain domain = new FailureDomain();
+        domain.brokers = Sets.newHashSet(broker1);
+        admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+        domain.brokers = Sets.newHashSet(broker2);
+        admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+        admin1.properties().createProperty(property, new PropertyAdmin(null, Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(namespace1);
+        admin1.namespaces().createNamespace(namespace2);
+        admin1.namespaces().setNamespaceAntiAffinityGroup(namespace1, namespaceAntiAffinityGroup);
+        admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup);
+
+        // validate strategically if brokerToDomainCache updated
+        for (int i = 0; i < 5; i++) {
+            if (!isLoadManagerUpdatedDomainCache(primaryLoadManager)
+                    || !isLoadManagerUpdatedDomainCache(secondaryLoadManager) || i != 4) {
+                Thread.sleep(200);
+            }
+        }
+        assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
+        assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
+
+        ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1");
+        String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+        serviceUnit = makeBundle(property, cluster, "ns2");
+        String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+        assertNotEquals(selectedBroker1, selectedBroker2);
+
+    }
+
+    /**
+     * It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
+     * cause uneven anti-affinitiy namespace distribution.
+     * 
+     * <pre>
+     * 1. broker1 owns ns-0 => broker1 can unload ns-0
+     * 1. broker2 owns ns-1 => broker1 can unload ns-0
+     * 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        brokers.add("broker-0");
+        brokers.add("broker-1");
+        brokers.add("broker-2");
+
+        String assignedNamespace = namespace + "0" + bundle;
+
+        // all brokers available so, candidate will be all 3 brokers
+        candidate.addAll(brokers);
+        // add ns-0 to broker-0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+        String currentBroker = "broker-0";
+        boolean shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle,
+                currentBroker, pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertTrue(shouldUnload);
+        // add ns-1 to broker-1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+        shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+                pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertTrue(shouldUnload);
+        // add ns-2 to broker-2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+        shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+                pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertFalse(shouldUnload);
+
+    }
+
+    /**
+     * It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
+     * brokers have same number of anti-affinity namespaces
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        PulsarClient pulsarClient = PulsarClient.create(pulsar1.getWebServiceAddress());
+        Producer producer = pulsarClient.createProducer("persistent://" + namespace + "0/my-topic1");
+        ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
+                .getLoadManager().get()).getLoadManager();
+
+        pulsar1.getBrokerService().updateRates();
+        loadManager.updateAll();
+
+        assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, primaryHost));
+        producer.close();
+        pulsarClient.close();
+    }
+
+    private boolean isLoadManagerUpdatedDomainCache(ModularLoadManagerImpl loadManager) throws Exception {
+        Field mapField = ModularLoadManagerImpl.class.getDeclaredField("brokerToFailureDomainMap");
+        mapField.setAccessible(true);
+        Map<String, String> map = (Map<String, String>) mapField.get(loadManager);
+        return !map.isEmpty();
+    }
+
+    private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
+        return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace),
+                Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
+                        BoundType.CLOSED));
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index cc4c082..fce825c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -499,7 +499,7 @@ public class ModularLoadManagerImplTest {
         final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
         final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
         final String sharedBroker = "broker3";
-        admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
+        admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
         admin1.properties().createProperty(property,
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);
@@ -535,7 +535,7 @@ public class ModularLoadManagerImplTest {
         // test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only
         Set<String> brokerCandidateCache = Sets.newHashSet();
         Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -543,7 +543,7 @@ public class ModularLoadManagerImplTest {
         // test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -551,7 +551,7 @@ public class ModularLoadManagerImplTest {
         // test3: shared=1, primary=0, secondary=0 => It should return 0 broker
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
@@ -565,7 +565,7 @@ public class ModularLoadManagerImplTest {
         // test1: shared=1, primary=1, secondary=1 => It should return primary + secondary
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 2);
         assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -574,7 +574,7 @@ public class ModularLoadManagerImplTest {
         // test2: shared=1, secondary=1 => It should return secondary
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -582,7 +582,7 @@ public class ModularLoadManagerImplTest {
         // test3: shared=1, => It should return 0 broker
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 720b48e..286a700 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -95,7 +95,7 @@ public class BacklogQuotaManagerTest {
             adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
             admin = new PulsarAdmin(adminUrl, (Authentication) null);
 
-            admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+            admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
             admin.properties().createProperty("prop",
                     new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
             admin.namespaces().createNamespace("prop/usc/ns-quota");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 8e8f31d..38afc78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -102,7 +102,7 @@ public class BrokerBkEnsemblesTests {
             adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
             admin = new PulsarAdmin(adminUrl, (Authentication) null);
 
-            admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+            admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
             admin.properties().createProperty("prop",
                     new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
         } catch (Throwable t) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 44372e7..e487f12 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -203,11 +203,11 @@ public class ReplicatorTestBase {
         admin3 = new PulsarAdmin(url3, (Authentication) null);
 
         // Provision the global namespace
-        admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
+        admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
                 pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
+        admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
                 pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
+        admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
                 pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
 
         admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
index d9a7c0f..0ac89e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.zookeeper;
 
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -29,7 +29,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest {
 
@@ -52,11 +52,11 @@ public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseT
     public void testSessionExpired() throws Exception {
         admin.clusters().createCluster("my-cluster", new ClusterData("test-url"));
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+        assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
         mockZookKeeper.failNow(Code.SESSIONEXPIRED);
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+        assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
         try {
             admin.clusters().createCluster("my-cluster-2", new ClusterData("test-url"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 7c7cff4..83b15f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -167,7 +167,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -189,7 +189,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));
@@ -243,7 +243,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         // this will cause NPE and it should throw 500
         doReturn(null).when(pulsar).getGlobalZkCache();
         try {
-            admin.clusters().createCluster(cluster, clusterData);
+            admin.clusters().updateCluster(cluster, clusterData);
         } catch (PulsarAdminException e) {
             Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
         }
@@ -268,7 +268,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -292,4 +292,4 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
 
     }
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 4a8d8cf..0d1b8c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -73,6 +73,7 @@ import com.google.common.collect.Sets;
 
 public class NonPersistentTopicTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
+    private final String configClusterName = "r1";
 
     @DataProvider(name = "subscriptionType")
     public Object[][] getSubscriptionType() {
@@ -835,7 +836,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             // completely
             // independent config objects instead of referring to the same properties object
             ServiceConfiguration config1 = new ServiceConfiguration();
-            config1.setClusterName("r1");
+            config1.setClusterName(configClusterName);
             config1.setWebServicePort(webServicePort1);
             config1.setZookeeperServers("127.0.0.1:" + zkPort1);
             config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
@@ -901,11 +902,11 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             admin3 = new PulsarAdmin(url3, (Authentication) null);
 
             // Provision the global namespace
-            admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
 
             admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index b8c1cde..7b8e083 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -40,6 +40,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
     protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
     protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
     protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String clusterName = "use";
 
     @BeforeMethod
     @Override
@@ -62,7 +63,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
         conf.setTlsEnabled(true);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        conf.setClusterName("use");
+        conf.setClusterName(clusterName);
     }
 
     protected void internalSetUpForClient() throws Exception {
@@ -78,7 +79,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
         clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         clientConf.setUseTls(true);
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        admin.clusters().createCluster("use",
+        admin.clusters().updateCluster(clusterName,
                 new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT,
                         "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index 3bac504..8398ea2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -76,7 +76,7 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
      * @throws Exception
      */
     @Test
-    public void testRiderectUrlWithServerStarted() throws Exception {
+    public void testRedirectUrlWithServerStarted() throws Exception {
         // 1. start server
         int port = PortManager.nextFreePort();
         ServiceConfig config = new ServiceConfig();
@@ -100,7 +100,7 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
          **/
 
         assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
-                "Property does not exist");
+                "Cannot set replication on a non-global namespace");
         assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Property does not exist");
         assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Property does not exist");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index c996c9f..302f2e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -44,7 +44,8 @@ import com.google.common.collect.Sets;
 
 public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
     private WebSocketService service;
-    private static final int TEST_PORT = PortManager.nextFreePort();;
+    private static final int TEST_PORT = PortManager.nextFreePort();
+    private final String configClusterName = "c1";
 
     public ProxyAuthorizationTest() {
         super();
@@ -53,7 +54,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        conf.setClusterName("c1");
+        conf.setClusterName(configClusterName);
         internalSetup();
 
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
@@ -81,7 +82,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
 
-        admin.clusters().createCluster("c1", new ClusterData());
+        admin.clusters().updateCluster(configClusterName, new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 1984e2a..779e7b1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
 /**
@@ -284,4 +285,135 @@ public interface Clusters {
      */
     NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
 
+    /**
+     * Create a domain into cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param FailureDomain
+     *          Domain configurations
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws ConflictException
+     *             Broker already exist into other domain
+     *             
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createFailureDomain(String cluster, String domainName, FailureDomain domain)
+            throws PulsarAdminException;
+    
+    
+    /**
+     * Update a domain into cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param FailureDomain
+     *          Domain configurations
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws ConflictException
+     *             Broker already exist into other domain
+     *             
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
+            throws PulsarAdminException;
+    
+    
+    /**
+     * Delete a domain in cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          Domain name
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+
+    void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+
+    /**
+     * Get all registered domains in cluster
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Cluster don't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException;
+    
+    /**
+     * Get the domain registered into a cluster
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Domain doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+    
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index ebe754c..3c8d3d5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -420,6 +420,78 @@ public interface Namespaces {
      *             Unexpected error
      */
     void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException;
+    
+    
+    /**
+     * Set anti-affinity group name for a namespace
+     * <p>
+     * Request example:
+     *
+     * @param namespace
+     *            Namespace name
+     * @param namespaceAntiAffinityGroup
+     *            anti-affinity group name for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException;
+    
+    /**
+     * Get all namespaces that grouped with given anti-affinity group
+     * 
+     * @param property
+     *            property is only used for authorization. Client has to be admin of any of the property to access this
+     *            api api.
+     * @param cluster
+     *            cluster name
+     * @param namespaceAntiAffinityGroup
+     *            Anti-affinity group name
+     * @return list of namespace grouped under a given anti-affinity group
+     * @throws PulsarAdminException
+     */
+    List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException;
+
+    /**
+     * Get anti-affinity group name for a namespace
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
+
+    /**
+     * Delete anti-affinity group name for a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
      * Set the deduplication status for all topics within a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 2fcf44a..666a2e8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Clusters;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
@@ -150,4 +151,49 @@ public class ClustersImpl extends BaseResource implements Clusters {
             throw getApiException(e);
         }
     }
+
+    @Override
+    public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+        setDomain(cluster, domainName, domain);
+    }
+
+    @Override
+    public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+        setDomain(cluster, domainName, domain);
+    }
+
+    @Override
+    public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+        request(clusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
+    }
+
+    @Override
+    public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
+        try {
+            return request(clusters.path(cluster).path("failureDomains"))
+                    .get(new GenericType<Map<String, FailureDomain>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+        try {
+            return request(clusters.path(cluster).path("failureDomains").path(domainName)).get(FailureDomain.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+    
+    private void setDomain(String cluster, String domainName,
+            FailureDomain domain) throws PulsarAdminException {
+        try {
+            request(clusters.path(cluster).path("failureDomains").path(domainName)).post(
+                    Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 79aa162..3f0c7e7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -229,6 +229,55 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).get(new GenericType<String>() {
+                    });
+
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException {
+        try {
+            return request(namespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup)
+                    .queryParam("property", property)).get(new GenericType<List<String>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 84bced0..5ba6c10 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -21,9 +21,11 @@ package org.apache.pulsar.admin.cli;
 import java.util.Arrays;
 
 import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -125,6 +127,84 @@ public class CmdClusters extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Create a new failure-domain for a cluster. updates it if already created.")
+    private class CreateFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+        private String brokerList;
+        
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            FailureDomain domain = new FailureDomain();
+            domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")): null));
+            admin.clusters().createFailureDomain(cluster, domainName, domain);
+        }
+    }
+
+    @Parameters(commandDescription = "Update failure-domain for a cluster. Creates a new one if not exist.")
+    private class UpdateFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+        private String brokerList;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            FailureDomain domain = new FailureDomain();
+            domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")) : null));
+            admin.clusters().updateFailureDomain(cluster, domainName, domain);
+        }
+    }
+    
+    @Parameters(commandDescription = "Deletes an existing failure-domain")
+    private class DeleteFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            admin.clusters().deleteFailureDomain(cluster, domainName);
+        }
+    }
+
+    @Parameters(commandDescription = "List the existing failure-domains for a cluster")
+    private class ListFailureDomains extends CliCommand {
+        
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+        
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            print(admin.clusters().getFailureDomains(cluster));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the configuration brokers of a failure-domain")
+    private class GetFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+        
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            print(admin.clusters().getFailureDomain(cluster, domainName));
+        }
+    }
+    
     public CmdClusters(PulsarAdmin admin) {
         super("clusters", admin);
         jcommander.addCommand("get", new Get());
@@ -133,6 +213,11 @@ public class CmdClusters extends CmdBase {
         jcommander.addCommand("delete", new Delete());
         jcommander.addCommand("list", new List());
         jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters());
+        jcommander.addCommand("get-failure-domain", new GetFailureDomain());
+        jcommander.addCommand("create-failure-domain", new CreateFailureDomain());
+        jcommander.addCommand("update-failure-domain", new UpdateFailureDomain());
+        jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain());
+        jcommander.addCommand("list-failure-domains", new ListFailureDomains());
     }
 
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 37d7306..e04eef7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -213,6 +213,65 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Set Anti-affinity group name for a namspace")
+    private class SetAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+        private String antiAffinityGroup;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+        }
+    }
+
+    @Parameters(commandDescription = "Get Anti-affinity group name for a namspace")
+    private class GetAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+        
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getNamespaceAntiAffinityGroup(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Get Anti-affinity namespaces grouped with the given anti-affinity group name")
+    private class GetAntiAffinityNamespaces extends CliCommand {
+
+        @Parameter(names = { "--property",
+                "-p" }, description = "property is only used for authorization. Client has to be admin of any of the property to access this api", required = false)
+        private String property;
+
+        @Parameter(names = { "--cluster", "-c" }, description = "Cluster name", required = true)
+        private String cluster;
+
+        @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+        private String antiAffinityGroup;
+
+        @Override
+        void run() throws PulsarAdminException {
+            print(admin.namespaces().getAntiAffinityNamespaces(property, cluster, antiAffinityGroup));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove Anti-affinity group name for a namspace")
+    private class DeleteAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+        
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+        }
+    }
+    
+
     @Parameters(commandDescription = "Enable or disable deduplication for a namespace")
     private class SetDeduplication extends CliCommand {
         @Parameter(description = "property/cluster/namespace", required = true)
@@ -621,6 +680,11 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+        
+        jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup());
+        jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
+        jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces());
+        jcommander.addCommand("delete-anti-affinity-group", new DeleteAntiAffinityGroup());
 
         jcommander.addCommand("set-deduplication", new SetDeduplication());
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 69d57da..ba12db7 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -131,6 +132,24 @@ public class PulsarAdminToolTest {
 
         clusters.run(split("delete use"));
         verify(mockClusters).deleteCluster("use");
+        
+        clusters.run(split("list-failure-domains use"));
+        verify(mockClusters).getFailureDomains("use");
+
+        clusters.run(split("get-failure-domain use --domain-name domain"));
+        verify(mockClusters).getFailureDomain("use", "domain");
+
+        clusters.run(split("create-failure-domain use --domain-name domain --broker-list b1"));
+        FailureDomain domain = new FailureDomain();
+        domain.setBrokers(Sets.newHashSet("b1"));
+        verify(mockClusters).createFailureDomain("use", "domain", domain);
+
+        clusters.run(split("update-failure-domain use --domain-name domain --broker-list b1"));
+        verify(mockClusters).updateFailureDomain("use", "domain", domain);
+
+        clusters.run(split("delete-failure-domain use --domain-name domain"));
+        verify(mockClusters).deleteFailureDomain("use", "domain");
+
 
         // Re-create CmdClusters to avoid a issue.
         // See https://github.com/cbeust/jcommander/issues/271
@@ -284,6 +303,19 @@ public class PulsarAdminToolTest {
 
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
+        
+        namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group"));
+        verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group");
+
+        namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
+        verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
+        
+        namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group"));
+        verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group");
+
+        namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
+        verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
+        
 
         namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
         verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
new file mode 100644
index 0000000..dd1d09b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+public class FailureDomain {
+
+    public Set<String> brokers = new HashSet<String>();
+
+    public Set<String> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(Set<String> brokers) {
+        this.brokers = brokers;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof FailureDomain) {
+            FailureDomain other = (FailureDomain) obj;
+            return Objects.equal(brokers, other.brokers);
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("brokers", brokers).toString();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a46b5fe..f3486b8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -42,6 +42,7 @@ public class Policies {
     public int message_ttl_in_seconds = 0;
     public RetentionPolicies retention_policies = null;
     public boolean deleted = false;
+    public String antiAffinityGroup;
 
     public static final String FIRST_BOUNDARY = "0x00000000";
     public static final String LAST_BOUNDARY = "0xffffffff";
@@ -63,7 +64,8 @@ public class Policies {
                     && message_ttl_in_seconds == other.message_ttl_in_seconds
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
-                    && Objects.equals(subscription_auth_mode, other.subscription_auth_mode);
+                    && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
+                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
         }
 
         return false;
@@ -86,6 +88,7 @@ public class Policies {
                 .add("deduplicationEnabled", deduplicationEnabled)
                 .add("clusterDispatchRate", clusterDispatchRate)
                 .add("latency_stats_sample_rate", latency_stats_sample_rate)
+                .add("antiAffinityGroup", antiAffinityGroup)
                 .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 06bd055..556a8b5 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -64,6 +64,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private final String configClusterName = "use";
 
     @BeforeMethod
     @Override
@@ -92,7 +93,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName(configClusterName);
 
         super.init();
 
@@ -157,7 +158,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
         // create a client which connects to proxy over tls and pass authData
         PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));

-- 
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.