You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/25 02:00:05 UTC

[GitHub] rdhabalia closed pull request #896: PIP-7 Introduce Failure-domain and Anti-affinity-namespace group

rdhabalia closed pull request #896: PIP-7 Introduce Failure-domain and Anti-affinity-namespace group
URL: https://github.com/apache/incubator-pulsar/pull/896
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index 20da0a910..28e4296bd 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -46,6 +46,9 @@ advertisedAddress=
 # Name of the cluster to which this broker belongs to
 clusterName=
 
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ce35f2ebd..3dbd8bbce 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -39,6 +39,9 @@ advertisedAddress=
 # Name of the cluster to which this broker belongs to
 clusterName=standalone
 
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 069933cf1..e4d6c2fe9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@
     // Name of the cluster to which this broker belongs to
     @FieldContext(required = true)
     private String clusterName;
+    // Enable cluster's failure-domain which can distribute brokers into logical region
+    @FieldContext(dynamic = true)
+    private boolean failureDomainsEnabled = false;
     // Zookeeper session timeout in milliseconds
     private long zooKeeperSessionTimeoutMillis = 30000;
     // Time to wait for broker graceful shutdown. After this time elapses, the
@@ -468,6 +471,14 @@ public void setClusterName(String clusterName) {
         this.clusterName = clusterName;
     }
 
+    public boolean isFailureDomainsEnabled() {
+        return failureDomainsEnabled;
+    }
+
+    public void setFailureDomainsEnabled(boolean failureDomainsEnabled) {
+        this.failureDomainsEnabled = failureDomainsEnabled;
+    }
+
     public long getBrokerShutdownTimeoutMs() {
         return brokerShutdownTimeoutMs;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 1c8ad8099..41a8f932c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -21,8 +21,10 @@
 import java.util.Map;
 
 import org.apache.bookkeeper.util.ZkUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -39,6 +41,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Maps;
 
 /**
  * ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
@@ -53,13 +56,21 @@
     private ZooKeeperDataCache<Policies> policiesCache;
     private ZooKeeperDataCache<ClusterData> clustersCache;
     private ZooKeeperChildrenCache clustersListCache;
+    private ZooKeeperChildrenCache failureDomainListCache;
     private ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache;
+    private ZooKeeperDataCache<FailureDomain> failureDomainCache;
 
     public static final String POLICIES = "policies";
+    public static final String FAILURE_DOMAIN = "failureDomain";
+    public final String CLUSTER_FAILURE_DOMAIN_ROOT;
     public static final String POLICIES_ROOT = "/admin/policies";
     private static final String CLUSTERS_ROOT = "/admin/clusters";
 
     public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
+        this(cache, null);
+    }
+
+    public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
         this.cache = cache;
 
         initZK();
@@ -86,6 +97,12 @@ public ClusterData deserialize(String path, byte[] content) throws Exception {
         };
 
         this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);
+        
+        CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
+        if (isNotBlank(configuredClusterName)) {
+            createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
+            this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT);
+        }
 
         this.namespaceIsolationPoliciesCache = new ZooKeeperDataCache<NamespaceIsolationPolicies>(cache) {
             @Override
@@ -96,6 +113,31 @@ public NamespaceIsolationPolicies deserialize(String path, byte[] content) throw
                         }));
             }
         };
+        
+        this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
+            @Override
+            public FailureDomain deserialize(String path, byte[] content) throws Exception {
+                return ObjectMapperFactory.getThreadLocal().readValue(content, FailureDomain.class);
+            }
+        };
+    }
+
+    private void createFailureDomainRoot(ZooKeeper zk, String path) {
+        try {
+            if (zk.exists(path, false) == null) {
+                try {
+                    byte[] data = "".getBytes();
+                    ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                    LOG.info("Successfully created failure-domain znode at {}", path);
+                } catch (KeeperException.NodeExistsException e) {
+                    // Ok
+                }
+            }
+        } catch (KeeperException.NodeExistsException e) {
+            // Ok
+        } catch (Exception e) {
+            LOG.warn("Failed to create failure-domain znode {} ", path, e);
+        }
     }
 
     private void initZK() throws PulsarServerException {
@@ -138,6 +180,10 @@ public ZooKeeperChildrenCache clustersListCache() {
         return this.clustersListCache;
     }
 
+    public ZooKeeperChildrenCache failureDomainListCache() {
+        return this.failureDomainListCache;
+    }
+    
     public ZooKeeper getZooKeeper() {
         return this.cache.getZooKeeper();
     }
@@ -145,4 +191,8 @@ public ZooKeeper getZooKeeper() {
     public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
         return this.namespaceIsolationPoliciesCache;
     }
+    
+    public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+        return this.failureDomainCache;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2d7f99b6f..6cd56abcc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -417,7 +417,7 @@ private void startZkCacheService() throws PulsarServerException {
             throw new PulsarServerException(e);
         }
 
-        this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
+        this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache(), this.config.getClusterName());
         this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f542a10f0..f04d15bb6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.admin;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -45,6 +47,7 @@
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -64,7 +67,6 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
@@ -202,6 +204,7 @@ public void validatePoliciesReadOnlyAccess() {
         return namespaces;
     }
 
+    
     /**
      * Redirect the call to the specified broker
      *
@@ -286,6 +289,14 @@ protected void setServletContext(ServletContext servletContext) {
         return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
     }
 
+    protected ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+        return pulsar().getConfigurationCache().failureDomainCache();
+    }
+
+    protected ZooKeeperChildrenCache failureDomainListCache() {
+        return pulsar().getConfigurationCache().failureDomainListCache();
+    }
+
     protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
             String destination, boolean authoritative) {
         DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
@@ -354,5 +365,14 @@ public PartitionedTopicMetadata deserialize(String key, byte[] content) throws E
         }
         return metadataFuture;
     }
-    
+
+    protected void validateClusterExists(String cluster) {
+        try {
+            if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+                throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
+            }
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
index b489f5ff9..612e2e254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
@@ -23,9 +23,11 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
@@ -38,14 +40,17 @@
 import javax.ws.rs.core.Response.Status;
 
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -53,12 +58,15 @@
 
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Maps;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.FAILURE_DOMAIN;
+
 @Path("/clusters")
 @Api(value = "/clusters", description = "Cluster admin apis", tags = "clusters")
 @Produces(MediaType.APPLICATION_JSON)
@@ -138,9 +146,14 @@ public void updateCluster(@PathParam("cluster") String cluster, ClusterData clus
             String clusterPath = path("clusters", cluster);
             Stat nodeStat = new Stat();
             byte[] content = globalZk().getData(clusterPath, null, nodeStat);
-            ClusterData currentClusterData = jsonMapper().readValue(content, ClusterData.class);
-            // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
-            currentClusterData.update(clusterData);
+            ClusterData currentClusterData = null;
+            if (content.length > 0) {
+                currentClusterData = jsonMapper().readValue(content, ClusterData.class);
+                // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
+                currentClusterData.update(clusterData);
+            } else {
+                currentClusterData = clusterData;
+            }
             // Write back the new updated ClusterData into zookeeper
             globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData),
                     nodeStat.getVersion());
@@ -259,6 +272,7 @@ public void deleteCluster(@PathParam("cluster") String cluster) {
 
         try {
             String clusterPath = path("clusters", cluster);
+            deleteFailureDomain(clusterPath);
             globalZk().delete(clusterPath, -1);
             globalZkCache().invalidate(clusterPath);
             log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
@@ -271,6 +285,25 @@ public void deleteCluster(@PathParam("cluster") String cluster) {
         }
     }
 
+    private void deleteFailureDomain(String clusterPath) {
+        try {
+            String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
+            if (globalZk().exists(failureDomain, false) == null) {
+                return;
+            }
+            for (String domain : globalZk().getChildren(failureDomain, false)) {
+                String domainPath = joinPath(failureDomain, domain);
+                globalZk().delete(domainPath, -1);
+            }
+            globalZk().delete(failureDomain, -1);
+            failureDomainCache().clear();
+            failureDomainListCache().clear();
+        } catch (Exception e) {
+            log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
+            throw new RestException(e);
+        }
+    }
+
     @GET
     @Path("/{cluster}/namespaceIsolationPolicies")
     @ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map")
@@ -296,16 +329,6 @@ public void deleteCluster(@PathParam("cluster") String cluster) {
         }
     }
 
-    private void validateClusterExists(String cluster) {
-        try {
-            if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
-                throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
-            }
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-    }
-
     @GET
     @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
     @ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class)
@@ -356,7 +379,7 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+                            this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
                             return new NamespaceIsolationPolicies();
                         } catch (KeeperException | InterruptedException e) {
                             throw new RestException(e);
@@ -386,23 +409,26 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
         }
     }
 
-    private void createNamespaceIsolationPolicyNode(String nsIsolationPolicyPath)
-            throws KeeperException, InterruptedException {
+    private boolean createZnodeIfNotExist(String path, Optional<Object> value) throws KeeperException, InterruptedException {
         // create persistent node on ZooKeeper
-        if (globalZk().exists(nsIsolationPolicyPath, false) == null) {
+        if (globalZk().exists(path, false) == null) {
             // create all the intermediate nodes
             try {
-                ZkUtils.createFullPathOptimistic(globalZk(), nsIsolationPolicyPath,
-                        jsonMapper().writeValueAsBytes(Collections.emptyMap()), Ids.OPEN_ACL_UNSAFE,
+                ZkUtils.createFullPathOptimistic(globalZk(), path,
+                        value.isPresent() ? jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
+                return true;
             } catch (KeeperException.NodeExistsException nee) {
-                log.debug("Other broker preempted the full path [{}] already. Continue...", nsIsolationPolicyPath);
+                if(log.isDebugEnabled()) {
+                    log.debug("Other broker preempted the full path [{}] already. Continue...", path);                    
+                }
             } catch (JsonGenerationException e) {
                 // ignore json error as it is empty hash
             } catch (JsonMappingException e) {
             } catch (IOException e) {
             }
         }
+        return false;
     }
 
     @DELETE
@@ -422,7 +448,7 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
             NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
-                            this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+                            this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
                             return new NamespaceIsolationPolicies();
                         } catch (KeeperException | InterruptedException e) {
                             throw new RestException(e);
@@ -446,6 +472,157 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
         }
     }
 
+    @POST
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Set cluster's failure Domain")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 409, message = "Broker already exist into other domain"),
+            @ApiResponse(code = 404, message = "Cluster doesn't exist") })
+    public void setFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName,
+            FailureDomain domain) throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+        validateBrokerExistsInOtherDomain(cluster, domainName, domain);
+
+        try {
+            String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+            if (this.createZnodeIfNotExist(domainPath, Optional.ofNullable(domain))) {
+                // clear domains-children cache
+                this.failureDomainListCache().clear();
+            } else {
+                globalZk().setData(domainPath, jsonMapper().writeValueAsBytes(domain), -1);
+                // make sure that the domain-cache will be refreshed for the next read access
+                failureDomainCache().invalidate(domainPath);
+            }
+        } catch (KeeperException.NoNodeException nne) {
+            log.warn("[{}] Failed to update domain {}. clusters {}  Does not exist", clientAppId(), cluster,
+                    domainName);
+            throw new RestException(Status.NOT_FOUND,
+                    "Domain " + domainName + " for cluster " + cluster + " does not exist");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
+            throw new RestException(e);
+        }
+    }
+
+    @GET
+    @Path("/{cluster}/failureDomains")
+    @ApiOperation(value = "Get the cluster failure domains", response = FailureDomain.class, responseContainer = "Map")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+    public Map<String, FailureDomain> getFailureDomains(@PathParam("cluster") String cluster) throws Exception {
+        validateSuperUserAccess();
+
+        Map<String, FailureDomain> domains = Maps.newHashMap();
+        try {
+            final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+            for (String domainName : failureDomainListCache().get()) {
+                try {
+                    Optional<FailureDomain> domain = failureDomainCache()
+                            .get(joinPath(failureDomainRootPath, domainName));
+                    if (domain.isPresent()) {
+                        domains.put(domainName, domain.get());
+                    }
+                } catch (Exception e) {
+                    log.warn("Failed to get domain {}", domainName, e);
+                }
+            }
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
+            return Collections.emptyMap();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        return domains;
+    }
+
+    @GET
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Get a domain in a cluster", response = FailureDomain.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Domain doesn't exist"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public FailureDomain getDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+            throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        try {
+            final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+            return failureDomainCache().get(joinPath(failureDomainRootPath, domainName))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                            "Domain " + domainName + " for cluster " + cluster + " does not exist"));
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
+            throw new RestException(e);
+        }
+    }
+
+    @DELETE
+    @Path("/{cluster}/failureDomains/{domainName}")
+    @ApiOperation(value = "Delete cluster's failure omain")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission or plicy is read only"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public void deleteFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+            throws Exception {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        try {
+            final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+            globalZk().delete(domainPath, -1);
+            // clear domain cache
+            failureDomainCache().invalidate(domainPath);
+            failureDomainListCache().clear();
+        } catch (KeeperException.NoNodeException nne) {
+            log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
+            throw new RestException(Status.NOT_FOUND,
+                    "Domain-name " + domainName + " or cluster " + cluster + " does not exist");
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
+            throw new RestException(e);
+        }
+    }
+
+    private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
+            final FailureDomain inputDomain) {
+        if (inputDomain != null && inputDomain.brokers != null) {
+            try {
+                final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+                for (String domainName : failureDomainListCache().get()) {
+                    if (inputDomainName.equals(domainName)) {
+                        continue;
+                    }
+                    try {
+                        Optional<FailureDomain> domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName));
+                        if (domain.isPresent() && domain.get().brokers != null) {
+                            List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
+                                    .filter(inputDomain.brokers::contains).collect(Collectors.toList());
+                            if (!duplicateBrokers.isEmpty()) {
+                                throw new RestException(Status.CONFLICT,
+                                        duplicateBrokers + " already exist into " + domainName);
+                            }
+                        }
+                    } catch (Exception e) {
+                        if (e instanceof RestException) {
+                            throw e;
+                        }
+                        log.warn("Failed to get domain {}", domainName, e);
+                    }
+                }
+            } catch (KeeperException.NoNodeException e) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
+                }
+            } catch (Exception e) {
+                log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
+                throw new RestException(e);
+            }
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Clusters.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index cb8cf72a8..cb8f982d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 
 import java.net.URI;
@@ -91,8 +94,6 @@
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
 @Path("/namespaces")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -668,6 +669,139 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path
         }
     }
 
+    @POST
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Set anti-affinity group for a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") })
+    public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, String antiAffinityGroup) {
+        validateAdminAccessOnProperty(property);
+        validatePoliciesReadOnlyAccess();
+        
+        log.info("[{}] Setting anti-affinity group {} for {}/{}/{}", clientAppId(), antiAffinityGroup, property,
+                cluster, namespace);
+
+        if (isBlank(antiAffinityGroup)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
+        }
+
+        NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
+        Entry<Policies, Stat> policiesNode = null;
+
+        try {
+            // Force to read the data s.t. the watch to the cache content is setup.
+            policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
+                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
+            policiesNode.getKey().antiAffinityGroup = antiAffinityGroup;
+
+            // Write back the new policies into zookeeper
+            globalZk().setData(path(POLICIES, property, cluster, namespace),
+                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+
+            log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}/{}/{}", clientAppId(),
+                    antiAffinityGroup, property, cluster, namespace);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}/{}/{}: does not exist", clientAppId(),
+                    property, cluster, namespace);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn(
+                    "[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{} expected policy node version={} : concurrent modification",
+                    clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());
+
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{}", clientAppId(), property, cluster,
+                    namespace, e);
+            throw new RestException(e);
+        }
+    }
+
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Get anti-affinity group of a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
+    public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace) {
+        validateAdminAccessOnProperty(property);
+        return getNamespacePolicies(property, cluster, namespace).antiAffinityGroup;
+    }
+
+    @GET
+    @Path("{cluster}/antiAffinity/{group}")
+    @ApiOperation(value = "Get all namespaces that are grouped by given anti-affinity group in a given cluster. api can be only accessed by admin of any of the existing property")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.") })
+    public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String cluster,
+            @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) {
+        validateAdminAccessOnProperty(property);
+
+        log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), property, antiAffinityGroup, cluster);
+
+        if (isBlank(antiAffinityGroup)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty.");
+        }
+        validateClusterExists(cluster);
+        List<String> namespaces = Lists.newArrayList();
+        try {
+            for (String prop : globalZk().getChildren(POLICIES_ROOT, false)) {
+                for (String namespace : globalZk().getChildren(path(POLICIES, prop, cluster), false)) {
+                    Optional<Policies> policies = policiesCache()
+                            .get(AdminResource.path(POLICIES, prop, cluster, namespace));
+                    if (policies.isPresent() && antiAffinityGroup.equalsIgnoreCase(policies.get().antiAffinityGroup)) {
+                        namespaces.add(String.format("%s/%s/%s", prop, cluster, namespace));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Failed to list of properties/namespace from global-zk", e);
+        }
+        return namespaces;
+    }
+    
+    @DELETE
+    @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+    @ApiOperation(value = "Remove anti-affinity group of a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+        validateAdminAccessOnProperty(property);
+        validatePoliciesReadOnlyAccess();
+
+        log.info("[{}] Deleting anti-affinity group for {}/{}/{}", clientAppId(), property, cluster, namespace);
+        
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, property, cluster, namespace);
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, Policies.class);
+            policies.antiAffinityGroup = null;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+            log.info("[{}] Successfully removed anti-affinity group for a namespace={}/{}/{}", clientAppId(), property,
+                    cluster, namespace);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: does not exist", clientAppId(),
+                    property, cluster, namespace);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: concurrent modification",
+                    clientAppId(), property, cluster, namespace);
+            throw new RestException(Status.CONFLICT, "Concurrent modification");
+        } catch (Exception e) {
+            log.error("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}", clientAppId(), property,
+                    cluster, namespace, e);
+            throw new RestException(e);
+        }
+    }
+
     @POST
     @Path("/{property}/{cluster}/{namespace}/deduplication")
     @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 1a2d636b8..ffdaac64f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -19,29 +19,44 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.broker.BrokerData;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * This class contains code which in shared between the two load manager implementations.
  */
@@ -60,13 +75,15 @@
     // update LoadReport at most every 5 seconds
     public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
 
+    private static final String DEFAULT_DOMAIN = "default";
+
     // Don't allow construction: static method namespace only.
     private LoadManagerShared() {
     }
 
     // Determines the brokers available for the given service unit according to the given policies.
     // The brokers are put into brokerCandidateCache.
-    public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
+    public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit,
             final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
             final Set<String> availableBrokers,
             final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
@@ -264,6 +281,227 @@ public static void removeMostServicingBrokersForNamespace(final String assignedB
         }
     }
     
+    /**
+     * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
+     * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
+     * number of namespaces. It also tries to keep brokers which has least number of namespace with in domain.
+     * eg.
+     * <pre>
+     * Before:
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b1-2,b2-1
+     * d2-3          b3-2,b4-1
+     * d3-4          b5-2,b6-2
+     * 
+     * After filtering: "candidates" brokers
+     * Domain-count  Brokers-count
+     * ____________  ____________
+     * d1-3          b2-1
+     * d2-3          b4-1
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * </pre>
+     * 
+     * @param pulsar
+     * @param assignedBundleName
+     * @param candidates
+     * @param brokerToNamespaceToBundleRange
+     */
+    public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
+            final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            Map<String, String> brokerToDomainMap) {
+        if (candidates.isEmpty()) {
+            return;
+        }
+        final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
+        try {
+            final Map<String, Integer> brokerToAntiAffinityNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar,
+                    namespaceName, brokerToNamespaceToBundleRange).get(30, TimeUnit.SECONDS);
+            if (brokerToAntiAffinityNamespaceCount == null) {
+                // none of the broker owns anti-affinity-namespace so, none of the broker will be filtered
+                return;
+            }
+            if (pulsar.getConfiguration().isFailureDomainsEnabled()) {
+                // this will remove all the brokers which are part of domains that don't have least number of
+                // anti-affinity-namespaces
+                filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates,
+                        brokerToDomainMap);
+            }
+            // now, "candidates" has list of brokers which are part of domain that can accept this namespace. now,
+            // with in these domains, remove brokers which don't have least number of namespaces. so, brokers with least
+            // number of namespace can be selected
+            int leastNamaespaceCount = Integer.MAX_VALUE;
+            for (final String broker : candidates) {
+                if (brokerToAntiAffinityNamespaceCount.containsKey(broker)) {
+                    Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker);
+                    if (namespaceCount == null) {
+                        // Assume that when the namespace is absent, there are no namespace assigned to
+                        // that broker.
+                        leastNamaespaceCount = 0;
+                        break;
+                    }
+                    leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount);
+                } else {
+                    // Assume non-present brokers have 0 bundles.
+                    leastNamaespaceCount = 0;
+                    break;
+                }
+            }
+            // filter out broker based on namespace distribution
+            if (leastNamaespaceCount == 0) {
+                candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker)
+                        && brokerToAntiAffinityNamespaceCount.get(broker) > 0);
+            } else {
+                final int finalLeastNamespaceCount = leastNamaespaceCount;
+                candidates
+                        .removeIf(broker -> brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount);
+            }
+        } catch (Exception e) {
+            log.error("Failed to filter anti-affinity group namespace {}", e.getMessage());
+        }
+    }
+
+    /**
+     * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
+     * namespaces more than this count.
+     * 
+     * @param brokerToAntiAffinityNamespaceCount
+     * @param candidates
+     * @param brokerToDomainMap
+     */
+    private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
+            Map<String, Integer> brokerToAntiAffinityNamespaceCount, Set<String> candidates,
+            Map<String, String> brokerToDomainMap) {
+
+        if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) {
+            return;
+        }
+
+        final Map<String, Integer> domainNamespaceCount = Maps.newHashMap();
+        int leastNamespaceCount = Integer.MAX_VALUE;
+        candidates.forEach(broker -> {
+            final String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN);
+            final int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0);
+            domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count);
+        });
+        // find leastNameSpaceCount
+        for (Entry<String, Integer> domainNsCountEntry : domainNamespaceCount.entrySet()) {
+            if (domainNsCountEntry.getValue() < leastNamespaceCount) {
+                leastNamespaceCount = domainNsCountEntry.getValue();
+            }
+        }
+        final int finalLeastNamespaceCount = leastNamespaceCount;
+        // only keep domain brokers which has leastNamespaceCount
+        candidates.removeIf(broker -> {
+            Integer nsCount = domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN));
+            return nsCount != null && nsCount != finalLeastNamespaceCount;
+        });
+    }
+
+    /**
+     * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
+     * {@param namespaceName}
+     * 
+     * @param pulsar
+     * @param namespaceName
+     * @param brokerToNamespaceToBundleRange
+     * @return
+     */
+    public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
+            final PulsarService pulsar, String namespaceName,
+            Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+
+        CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
+        ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
+
+        policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
+            if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
+                antiAffinityNsBrokersResult.complete(null);
+                return;
+            }
+            final String antiAffinityGroup = policies.get().antiAffinityGroup;
+            final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
+            final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+            brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
+                nsToBundleRange.forEach((ns, bundleRange) -> {
+                    CompletableFuture<Void> future = new CompletableFuture<>();
+                    futures.add(future);
+                    policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
+                        if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
+                            brokerToAntiAffinityNamespaceCount.compute(broker,
+                                    (brokerName, count) -> count == null ? 1 : count + 1);
+                        }
+                        future.complete(null);
+                    }).exceptionally(ex -> {
+                        future.complete(null);
+                        return null;
+                    });
+                });
+            });
+            FutureUtil.waitForAll(futures)
+                    .thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
+        }).exceptionally(ex -> {
+            // namespace-policies has not been created yet
+            antiAffinityNsBrokersResult.complete(null);
+            return null;
+        });
+        return antiAffinityNsBrokersResult;
+    }
+
+    /**
+     * 
+     * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
+     * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
+     * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
+     * by different broker.
+     * 
+     * @param namespace
+     * @param bundle
+     * @param currentBroker
+     * @param pulsar
+     * @param brokerToNamespaceToBundleRange
+     * @param candidateBroekrs
+     * @return
+     * @throws Exception
+     */
+    public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
+            final PulsarService pulsar, Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            Set<String> candidateBroekrs) throws Exception {
+
+        Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
+                brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS);
+        if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
+            int leastNsCount = Integer.MAX_VALUE;
+            int currentBrokerNsCount = 0;
+
+            for (String broker : candidateBroekrs) {
+                int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
+                if (currentBroker.equals(broker)) {
+                    currentBrokerNsCount = nsCount;
+                }
+                if (leastNsCount > nsCount) {
+                    leastNsCount = nsCount;
+                }
+            }
+            // check if there is any other broker has less number of ns
+            if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
+                return true;
+            }
+            // check if all the brokers having same number of ns-count then broker can't unload
+            int leastNsOwnerBrokers = 0;
+            for (String broker : candidateBroekrs) {
+                if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) {
+                    leastNsOwnerBrokers++;
+                }
+            }
+            // if all candidate brokers own same-number of ns then broker can't unload
+            return candidateBroekrs.size() != leastNsOwnerBrokers;
+        }
+        return true;
+    }
+
     public interface BrokerTopicLoadingPredicate {
         boolean isEnablePersistentTopics(String brokerUrl);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9d5cf3065..b8519d85b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,6 +29,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -57,6 +60,8 @@
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -75,6 +80,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
@@ -170,7 +177,8 @@
     
     // check if given broker can load persistent/non-persistent topic
     private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-
+    
+    private Map<String, String> brokerToFailureDomainMap;
 
     private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
             .readValue(content, LocalBrokerData.class);
@@ -188,6 +196,7 @@ public ModularLoadManagerImpl() {
         loadSheddingPipeline.add(new OverloadShedder(conf));
         preallocatedBundleToBroker = new ConcurrentHashMap<>();
         scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
+        this.brokerToFailureDomainMap = Maps.newHashMap();
         
         this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
             @Override
@@ -214,6 +223,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
      *            The service to initialize with.
      */
     public void initialize(final PulsarService pulsar) {
+        this.pulsar = pulsar;
         availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(),
                 LoadManager.LOADBALANCE_BROKERS_ROOT);
         availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
@@ -266,9 +276,15 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
         
         placementStrategy = ModularLoadManagerStrategy.create(conf);
         policies = new SimpleResourceAllocationPolicies(pulsar);
-        this.pulsar = pulsar;
         zkClient = pulsar.getZkClient();
         filterPipeline.add(new BrokerVersionFilter());
+        
+        refreshBrokerToFailureDomainMap();
+        // register listeners for domain changes
+        pulsar.getConfigurationCache().failureDomainListCache()
+                .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
+        pulsar.getConfigurationCache().failureDomainCache()
+                .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
     }
 
     /**
@@ -421,7 +437,7 @@ private boolean needBrokerDataUpdate() {
     }
 
     // Update both the broker data and the bundle data.
-    private void updateAll() {
+    public void updateAll() {
         if (log.isDebugEnabled()) {
             log.debug("Updating broker and bundle data for loadreport");
         }
@@ -575,10 +591,14 @@ public synchronized void doLoadShedding() {
                     for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
                         final String broker = entry.getKey();
                         final String bundle = entry.getValue();
-                        log.info("Unloading bundle: {}", bundle);
-                        pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
-                                LoadManagerShared.getNamespaceNameFromBundleName(bundle),
-                                LoadManagerShared.getBundleRangeFromBundleName(bundle));
+                        
+                        final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+                        final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
+                        if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
+                            continue;
+                        }
+                        log.info("Unloading bundle: {} from broker {}", bundle, broker);
+                        pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
                         loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
                     }
                 } catch (Exception e) {
@@ -589,6 +609,32 @@ public synchronized void doLoadShedding() {
         }
     }
 
+    public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
+        try {
+            Optional<Policies> nsPolicies = pulsar.getConfigurationCache().policiesCache()
+                    .get(path(POLICIES, namespace));
+            if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().antiAffinityGroup)) {
+                return true;
+            }
+
+            synchronized (brokerCandidateCache) {
+                brokerCandidateCache.clear();
+                ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
+                        .getBundle(namespace, bundle);
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
+                        getAvailableBrokers(), brokerTopicLoadingPredicate);
+                return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
+                        brokerToNamespaceToBundleRange, brokerCandidateCache);
+            }
+
+        } catch (Exception e) {
+            log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", namespace, bundle,
+                    currentBroker, e.getMessage());
+
+        }
+        return true;
+    }
+
     /**
      * As the leader broker, attempt to automatically detect and split hot namespace bundles.
      */
@@ -657,11 +703,18 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
             final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
                     key -> getBundleDataOrDefault(bundle));
             brokerCandidateCache.clear();
-            LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+            LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                     brokerTopicLoadingPredicate);
+
             // filter brokers which owns topic higher than threshold
             LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
                     conf.getLoadBalancerBrokerMaxTopics());
+
+            // distribute namespaces to domain and brokers according to anti-affinity-group
+            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache,
+                    brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
+            // distribute bundles evenly to candidate-brokers
+
             LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
                     brokerToNamespaceToBundleRange);
             log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
@@ -673,13 +726,13 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
                 }
             } catch ( BrokerFilterException x ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
             }
 
             if ( brokerCandidateCache.isEmpty() ) {
                 // restore the list of brokers to the full set
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
             }
 
@@ -693,7 +746,7 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
             final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
             if (maxUsage > overloadThreshold) {
                 // All brokers that were in the filtered list were overloaded, so check if there is a better broker
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
                         brokerTopicLoadingPredicate);
                 broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
             }
@@ -871,4 +924,33 @@ private long getBrokerZnodeOwner() {
         }
         return 0;
     }
+
+    private void refreshBrokerToFailureDomainMap() {
+        if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
+            return;
+        }
+        final String clusterDomainRootPath = pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+        try {
+            synchronized (brokerToFailureDomainMap) {
+                Map<String, String> tempBrokerToFailureDomainMap = Maps.newHashMap();
+                for (String domainName : pulsar.getConfigurationCache().failureDomainListCache().get()) {
+                    try {
+                        Optional<FailureDomain> domain = pulsar.getConfigurationCache().failureDomainCache()
+                                .get(clusterDomainRootPath + "/" + domainName);
+                        if (domain.isPresent()) {
+                            for (String broker : domain.get().brokers) {
+                                tempBrokerToFailureDomainMap.put(broker, domainName);
+                            }
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to get domain {}", domainName, e);
+                    }
+                }
+                this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap;
+            }
+            log.info("Cluster domain refreshed {}", brokerToFailureDomainMap);
+        } catch (Exception e) {
+            log.warn("Failed to get domain-list for cluster {}", e.getMessage());
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 338f302dd..bdf034528 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -913,7 +913,7 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
             }
             brokerCandidateCache.clear();
             try {
-                LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
+                LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
                         brokerTopicLoadingPredicate);
             } catch (Exception e) {
                 log.warn("Error when trying to apply policies: {}", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c22758dd1..40b7d03f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -168,25 +168,26 @@ protected void validateAdminAccessOnProperty(String property) {
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
-            log.error("Failed to get property admin data for property");
+            log.error("Failed to get property admin data for property {}", property);
             throw new RestException(e);
         }
     }
 
     protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
-        PropertyAdmin propertyAdmin;
-
-        try {
-            propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
-        } catch (KeeperException.NoNodeException e) {
-            log.warn("Failed to get property admin data for non existing property {}", property);
-            throw new RestException(Status.NOT_FOUND, "Property does not exist");
-        }
 
         if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
             log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
                     (isClientAuthenticated(clientAppId)), clientAppId);
+            
+            PropertyAdmin propertyAdmin;
+
+            try {
+                propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
+                        .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
+            } catch (KeeperException.NoNodeException e) {
+                log.warn("Failed to get property admin data for non existing property {}", property);
+                throw new RestException(Status.NOT_FOUND, "Property does not exist");
+            }
 
             if (!isClientAuthenticated(clientAppId)) {
                 throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index e0d55a661..73071ff17 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -115,7 +115,7 @@ private void createProperty(PulsarAdmin pulsarAdmin)
             throws PulsarClientException, MalformedURLException, PulsarAdminException {
         ClusterData clusterData = new ClusterData();
         clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString());
-        pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
+        pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData);
         Set<String> allowedClusters = new HashSet<>();
         allowedClusters.add("my-cluster");
         PropertyAdmin adminConfig = new PropertyAdmin();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 795d8071a..444d224a3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -172,21 +172,23 @@ public void cleanup() throws Exception {
     public void clusters() throws Exception {
         admin.clusters().createCluster("usw",
                 new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+        // failure-domain znode of this default cluster
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
 
         assertEquals(admin.clusters().getCluster("use"),
                 new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
 
         admin.clusters().updateCluster("usw",
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
         assertEquals(admin.clusters().getCluster("usw"),
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
 
         admin.clusters().updateCluster("usw",
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
                         "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
         assertEquals(admin.clusters().getCluster("usw"),
                 new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
                         "https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
@@ -194,11 +196,11 @@ public void clusters() throws Exception {
         admin.clusters().deleteCluster("usw");
         Thread.sleep(300);
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use"));
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use"));
 
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
 
         // Check name validation
         try {
@@ -382,7 +384,9 @@ public void brokers() throws Exception {
 
         admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
         admin.clusters().deleteCluster("use");
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+        // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+        // failure-domain znode of this default cluster
+        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 2362f44f4..582578356 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -25,7 +26,9 @@
 import static org.testng.Assert.fail;
 
 import java.net.URL;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -54,6 +57,7 @@
 import org.apache.pulsar.common.naming.DestinationDomain;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -582,7 +586,7 @@ public void testPeerCluster() throws Exception {
             assertTrue(e instanceof PreconditionFailedException);
         }
     }
-    
+
     /**
      * It validates that peer-cluster can't coexist in replication-cluster list
      * 
@@ -640,4 +644,69 @@ public void testReplicationPeerCluster() throws Exception {
         admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
     }
 
+    @Test
+    public void clusterFailureDomain() throws PulsarAdminException {
+
+        final String cluster = pulsar.getConfiguration().getClusterName();
+        admin.clusters().updateCluster(cluster,
+                new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
+        // create
+        FailureDomain domain = new FailureDomain();
+        domain.setBrokers(Sets.newHashSet("b1", "b2", "b3"));
+        admin.clusters().createFailureDomain(cluster, "domain-1", domain);
+        admin.clusters().updateFailureDomain(cluster, "domain-1", domain);
+
+        assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain);
+
+        Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster);
+        assertEquals(domains.size(), 1);
+        assertTrue(domains.containsKey("domain-1"));
+
+        try {
+            // try to create domain with already registered brokers
+            admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+            fail("should have failed because of brokers are already registered");
+        } catch (PulsarAdminException.ConflictException e) {
+            // Ok
+        }
+
+        admin.clusters().deleteFailureDomain(cluster, "domain-1");
+        assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty());
+
+        admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+        domains = admin.clusters().getFailureDomains(cluster);
+        assertEquals(domains.size(), 1);
+        assertTrue(domains.containsKey("domain-2"));
+    }
+
+    @Test
+    public void namespaceAntiAffinity() throws PulsarAdminException {
+        final String namespace = "prop-xyz/use/ns1";
+        final String antiAffinityGroup = "group";
+        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+        admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+        assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup);
+        admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+        assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+
+        final String ns1 = "prop-xyz/use/antiAG1";
+        final String ns2 = "prop-xyz/use/antiAG2";
+        final String ns3 = "prop-xyz/use/antiAG3";
+        admin.namespaces().createNamespace(ns1);
+        admin.namespaces().createNamespace(ns2);
+        admin.namespaces().createNamespace(ns3);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup);
+        admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup);
+
+        Set<String> namespaces = new HashSet<>(
+                admin.namespaces().getAntiAffinityNamespaces("dummy", "use", antiAffinityGroup));
+        assertEquals(namespaces.size(), 3);
+        assertTrue(namespaces.contains(ns1));
+        assertTrue(namespaces.contains(ns2));
+        assertTrue(namespaces.contains(ns3));
+
+        List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
+        assertEquals(namespaces2.size(), 0);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0245b6721..c587033af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -91,10 +91,11 @@
 
     private Field uriField;
     private UriInfo uriInfo;
+    private final String configClusterName = "use";
 
     public AdminTest() {
         super();
-        conf.setClusterName("use");
+        conf.setClusterName(configClusterName);
     }
 
     @Override
@@ -186,10 +187,10 @@ public void cleanup() throws Exception {
 
     @Test
     void clusters() throws Exception {
-        assertEquals(clusters.getClusters(), new ArrayList<String>());
+        assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName));
         verify(clusters, never()).validateSuperUserAccess();
 
-        clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
+        clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
         verify(clusters, times(1)).validateSuperUserAccess();
         // ensure to read from ZooKeeper directly
         clusters.clustersListCache().clear();
@@ -447,7 +448,7 @@ void properties() throws Exception {
         assertEquals(properties.getProperties(), Lists.newArrayList());
 
         // Create a namespace to test deleting a non-empty property
-        clusters.createCluster("use", new ClusterData());
+        clusters.updateCluster("use", new ClusterData());
         newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use"));
         properties.createProperty("my-property", newPropertyAdmin);
 
@@ -474,7 +475,7 @@ void properties() throws Exception {
 
     @Test
     void brokers() throws Exception {
-        clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com",
+        clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com",
                 "https://broker.messaging.use.example.com:4443"));
 
         URI requestUri = new URI(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 37ec7ebc8..36c9ec2b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -135,7 +135,7 @@ public void setup() throws Exception {
         doNothing().when(namespaces).validateAdminAccessOnProperty("other-property");
         doNothing().when(namespaces).validateAdminAccessOnProperty("new-property");
 
-        admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
+        admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
         admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT));
         admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT));
         admin.properties().createProperty(this.testProperty,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 0f79e7509..cee4bbe33 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -64,7 +64,7 @@ void simple() throws Exception {
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
 
-        admin.clusters().createCluster("c1", new ClusterData());
+        admin.clusters().updateCluster("c1", new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 67873bc6c..57ed5e5e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -77,6 +77,7 @@
     protected MockZooKeeper mockZookKeeper;
     protected NonClosableMockBookKeeper mockBookKeeper;
     protected boolean isTcpLookup = false;
+    protected final String configClusterName = "test";
 
     private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
 
@@ -90,13 +91,12 @@ protected void resetConfig() {
         this.conf.setBrokerServicePortTls(BROKER_PORT_TLS);
         this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
         this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
-        this.conf.setClusterName("test");
+        this.conf.setClusterName(configClusterName);
         this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
         this.conf.setManagedLedgerCacheSizeMB(8);
         this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
         this.conf.setDefaultNumberOfNamespaceBundles(1);
         this.conf.setZookeeperServers("localhost:2181");
-        this.conf.setClusterName("mock");
     }
 
     protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
new file mode 100644
index 000000000..ec0bf1f54
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.beust.jcommander.internal.Maps;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+import junit.framework.Assert;
+
+public class AntiAffinityNamespaceGroupTest {
+    private LocalBookkeeperEnsemble bkEnsemble;
+
+    private URL url1;
+    private PulsarService pulsar1;
+    private PulsarAdmin admin1;
+
+    private URL url2;
+    private PulsarService pulsar2;
+    private PulsarAdmin admin2;
+
+    private String primaryHost;
+    private String secondaryHost;
+
+    private NamespaceBundleFactory nsFactory;
+
+    private ModularLoadManagerImpl primaryLoadManager;
+    private ModularLoadManagerImpl secondaryLoadManager;
+
+    private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>());
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+    private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+    private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
+    private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort();
+    private static final Logger log = LoggerFactory.getLogger(AntiAffinityNamespaceGroupTest.class);
+
+    static {
+        System.setProperty("test.basePort", "16100");
+    }
+
+    private static Object getField(final Object instance, final String fieldName) throws Exception {
+        final Field field = instance.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return field.get(instance);
+    }
+
+    @BeforeMethod
+    void setup() throws Exception {
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        // Start broker 1
+        ServiceConfiguration config1 = new ServiceConfiguration();
+        config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config1.setClusterName("use");
+        config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
+        config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
+        config1.setFailureDomainsEnabled(true);
+        config1.setLoadBalancerEnabled(true);
+        createCluster(bkEnsemble.getZkClient(), config1);
+        pulsar1 = new PulsarService(config1);
+
+        pulsar1.start();
+
+        primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
+        url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
+        admin1 = new PulsarAdmin(url1, (Authentication) null);
+
+        // Start broker 2
+        ServiceConfiguration config2 = new ServiceConfiguration();
+        config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        config2.setClusterName("use");
+        config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
+        config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
+        config2.setFailureDomainsEnabled(true);
+        pulsar2 = new PulsarService(config2);
+        secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
+                SECONDARY_BROKER_WEBSERVICE_PORT);
+
+        pulsar2.start();
+
+        url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
+        admin2 = new PulsarAdmin(url2, (Authentication) null);
+
+        primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
+        secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
+        nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        executor.shutdown();
+
+        admin1.close();
+        admin2.close();
+
+        pulsar2.close();
+        pulsar1.close();
+
+        bkEnsemble.stop();
+    }
+
+    private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception {
+        ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(),
+                ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+                        new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort())),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+
+    @Test
+    public void testClusterDomain() {
+
+    }
+
+    /**
+     * 
+     * It verifies anti-affinity-namespace assignment with failure-domain
+     * 
+     * <pre>
+     * Domain     Brokers-count
+     * ________  ____________
+     * domain-0   broker-0,broker-1
+     * domain-1   broker-2,broker-3
+     * 
+     * Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * (2) ns1 -> candidate-brokers: b2, b3         => selected b2
+     * (3) ns2 -> candidate-brokers: b1, b3         => selected b1
+     * (4) ns3 -> candidate-brokers: b3             => selected b3
+     * (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+     * 
+     * "candidate" broker to own anti-affinity-namespace = b2 or b4
+     * 
+     * </pre>
+     * 
+     */
+    @Test
+    public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+        final int totalBrokers = 4;
+
+        pulsar1.getConfiguration().setFailureDomainsEnabled(true);
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Map<String, String> brokerToDomainMap = Maps.newHashMap();
+        brokers.add("brokerName-0");
+        brokerToDomainMap.put("brokerName-0", "domain-0");
+        brokers.add("brokerName-1");
+        brokerToDomainMap.put("brokerName-1", "domain-0");
+        brokers.add("brokerName-2");
+        brokerToDomainMap.put("brokerName-2", "domain-1");
+        brokers.add("brokerName-3");
+        brokerToDomainMap.put("brokerName-3", "domain-1");
+
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+
+        Assert.assertEquals(brokers.size(), totalBrokers);
+
+        String assignedNamespace = namespace + "0" + bundle;
+        candidate.addAll(brokers);
+
+        // for namespace-0 all brokers available
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(brokers.size(), totalBrokers);
+
+        // add namespace-0 to broker-0 of domain-0 => state: n0->b0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-0", namespace + "0", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-1 only domain-1 brokers are available as broker-0 already owns namespace-0
+        assignedNamespace = namespace + "1" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 2);
+        candidate.forEach(broker -> Assert.assertEquals(brokerToDomainMap.get(broker), "domain-1"));
+
+        // add namespace-1 to broker-2 of domain-1 => state: n0->b0, n1->b2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-2", namespace + "1", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-2 only brokers available are : broker-1 and broker-3
+        assignedNamespace = namespace + "2" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 2);
+        Assert.assertTrue(candidate.contains("brokerName-1"));
+        Assert.assertTrue(candidate.contains("brokerName-3"));
+
+        // add namespace-2 to broker-1 of domain-0 => state: n0->b0, n1->b2, n2->b1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-1", namespace + "2", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-3 only brokers available are : broker-3
+        assignedNamespace = namespace + "3" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 1);
+        Assert.assertTrue(candidate.contains("brokerName-3"));
+        // add namespace-3 to broker-3 of domain-1 => state: n0->b0, n1->b2, n2->b1, n3->b3
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-3", namespace + "3", assignedNamespace);
+        candidate.addAll(brokers);
+        // for namespace-4 only brokers available are : all 4 brokers
+        assignedNamespace = namespace + "4" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, brokerToDomainMap);
+        Assert.assertEquals(candidate.size(), 4);
+    }
+
+    /**
+     * It verifies anti-affinity-namespace assignment without failure-domain enabled
+     * 
+     * <pre>
+     *  Anti-affinity-namespace assignment
+     * 
+     * (1) ns0 -> candidate-brokers: b0, b1, b2     => selected b0
+     * (2) ns1 -> candidate-brokers: b1, b2         => selected b1
+     * (3) ns2 -> candidate-brokers: b2             => selected b2
+     * (5) ns3 -> candidate-brokers: b0, b1, b2     => selected b0
+     * </pre>
+     * 
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        brokers.add("broker-0");
+        brokers.add("broker-1");
+        brokers.add("broker-2");
+
+        String assignedNamespace = namespace + "0" + bundle;
+
+        // all brokers available so, candidate will be all 3 brokers
+        candidate.addAll(brokers);
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(brokers.size(), 3);
+
+        // add ns-0 to broker-0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+        candidate.addAll(brokers);
+        assignedNamespace = namespace + "1" + bundle;
+        // available brokers for ns-1 => broker-1, broker-2
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 2);
+        Assert.assertTrue(candidate.contains("broker-1"));
+        Assert.assertTrue(candidate.contains("broker-2"));
+
+        // add ns-1 to broker-1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+        candidate.addAll(brokers);
+        // available brokers for ns-2 => broker-2
+        assignedNamespace = namespace + "2" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 1);
+        Assert.assertTrue(candidate.contains("broker-2"));
+
+        // add ns-2 to broker-2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+        candidate.addAll(brokers);
+        // available brokers for ns-3 => broker-0, broker-1, broker-2
+        assignedNamespace = namespace + "3" + bundle;
+        LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+                brokerToNamespaceToBundleRange, null);
+        Assert.assertEquals(candidate.size(), 3);
+    }
+
+    private void selectBrokerForNamespace(Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+            String broker, String namespace, String assignedBundleName) {
+        Map<String, Set<String>> nsToBundleMap = Maps.newHashMap();
+        nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName));
+        brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
+    }
+
+    /**
+     * It verifies anti-affinity with failure domain enabled with 2 brokers.
+     * 
+     * <pre>
+     * 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
+     * 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
+     * 3. Create two namespace with anti-affinity
+     * 4. Load-manager selects broker for each namespace such that from different domains
+     * 
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
+
+        final String broker1 = primaryHost;
+        final String broker2 = secondaryHost;
+        final String cluster = pulsar1.getConfiguration().getClusterName();
+        final String property = "prop";
+        final String namespace1 = property + "/" + cluster + "/ns1";
+        final String namespace2 = property + "/" + cluster + "/ns2";
+        final String namespaceAntiAffinityGroup = "group";
+        FailureDomain domain = new FailureDomain();
+        domain.brokers = Sets.newHashSet(broker1);
+        admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+        domain.brokers = Sets.newHashSet(broker2);
+        admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+        admin1.properties().createProperty(property, new PropertyAdmin(null, Sets.newHashSet(cluster)));
+        admin1.namespaces().createNamespace(namespace1);
+        admin1.namespaces().createNamespace(namespace2);
+        admin1.namespaces().setNamespaceAntiAffinityGroup(namespace1, namespaceAntiAffinityGroup);
+        admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup);
+
+        // validate strategically if brokerToDomainCache updated
+        for (int i = 0; i < 5; i++) {
+            if (!isLoadManagerUpdatedDomainCache(primaryLoadManager)
+                    || !isLoadManagerUpdatedDomainCache(secondaryLoadManager) || i != 4) {
+                Thread.sleep(200);
+            }
+        }
+        assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
+        assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
+
+        ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1");
+        String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+        serviceUnit = makeBundle(property, cluster, "ns2");
+        String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+        assertNotEquals(selectedBroker1, selectedBroker2);
+
+    }
+
+    /**
+     * It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
+     * cause uneven anti-affinitiy namespace distribution.
+     * 
+     * <pre>
+     * 1. broker1 owns ns-0 => broker1 can unload ns-0
+     * 1. broker2 owns ns-1 => broker1 can unload ns-0
+     * 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
+     * </pre>
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "/0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        Set<String> brokers = Sets.newHashSet();
+        Set<String> candidate = Sets.newHashSet();
+        Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+        brokers.add("broker-0");
+        brokers.add("broker-1");
+        brokers.add("broker-2");
+
+        String assignedNamespace = namespace + "0" + bundle;
+
+        // all brokers available so, candidate will be all 3 brokers
+        candidate.addAll(brokers);
+        // add ns-0 to broker-0
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+        String currentBroker = "broker-0";
+        boolean shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle,
+                currentBroker, pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertTrue(shouldUnload);
+        // add ns-1 to broker-1
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+        shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+                pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertTrue(shouldUnload);
+        // add ns-2 to broker-2
+        selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+        shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+                pulsar1, brokerToNamespaceToBundleRange, candidate);
+        assertFalse(shouldUnload);
+
+    }
+
+    /**
+     * It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
+     * brokers have same number of anti-affinity namespaces
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
+
+        final String namespace = "my-property/use/my-ns";
+        final int totalNamespaces = 5;
+        final String namespaceAntiAffinityGroup = "my-antiaffinity";
+        final String bundle = "0x00000000_0xffffffff";
+
+        admin1.properties().createProperty("my-property",
+                new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+        for (int i = 0; i < totalNamespaces; i++) {
+            final String ns = namespace + i;
+            admin1.namespaces().createNamespace(ns);
+            admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+        }
+
+        PulsarClient pulsarClient = PulsarClient.create(pulsar1.getWebServiceAddress());
+        Producer producer = pulsarClient.createProducer("persistent://" + namespace + "0/my-topic1");
+        ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
+                .getLoadManager().get()).getLoadManager();
+
+        pulsar1.getBrokerService().updateRates();
+        loadManager.updateAll();
+
+        assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, primaryHost));
+        producer.close();
+        pulsarClient.close();
+    }
+
+    private boolean isLoadManagerUpdatedDomainCache(ModularLoadManagerImpl loadManager) throws Exception {
+        Field mapField = ModularLoadManagerImpl.class.getDeclaredField("brokerToFailureDomainMap");
+        mapField.setAccessible(true);
+        Map<String, String> map = (Map<String, String>) mapField.get(loadManager);
+        return !map.isEmpty();
+    }
+
+    private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
+        return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace),
+                Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
+                        BoundType.CLOSED));
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index cc4c0823c..fce825c02 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -499,7 +499,7 @@ public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws
         final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
         final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
         final String sharedBroker = "broker3";
-        admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
+        admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
         admin1.properties().createProperty(property,
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
         admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);
@@ -535,7 +535,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only
         Set<String> brokerCandidateCache = Sets.newHashSet();
         Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -543,7 +543,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -551,7 +551,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test3: shared=1, primary=0, secondary=0 => It should return 0 broker
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
@@ -565,7 +565,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test1: shared=1, primary=1, secondary=1 => It should return primary + secondary
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 2);
         assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -574,7 +574,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test2: shared=1, secondary=1 => It should return secondary
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 1);
         assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -582,7 +582,7 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
         // test3: shared=1, => It should return 0 broker
         brokerCandidateCache = Sets.newHashSet();
         availableBrokers = Sets.newHashSet(sharedBroker);
-        LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
                 availableBrokers, brokerTopicLoadingPredicate);
         assertEquals(brokerCandidateCache.size(), 0);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 720b48eb3..286a700b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -95,7 +95,7 @@ void setup() throws Exception {
             adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
             admin = new PulsarAdmin(adminUrl, (Authentication) null);
 
-            admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+            admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
             admin.properties().createProperty("prop",
                     new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
             admin.namespaces().createNamespace("prop/usc/ns-quota");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 347ffaf56..3e1f9d4cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -92,7 +92,7 @@ void setup() throws Exception {
             adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
             admin = new PulsarAdmin(adminUrl, (Authentication) null);
 
-            admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+            admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
             admin.properties().createProperty("prop",
                     new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
         } catch (Throwable t) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 44372e730..e487f1252 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -203,11 +203,11 @@ void setup() throws Exception {
         admin3 = new PulsarAdmin(url3, (Authentication) null);
 
         // Provision the global namespace
-        admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
+        admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
                 pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
+        admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
                 pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
+        admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
                 pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
 
         admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
index d9a7c0fca..0ac89e467 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.zookeeper;
 
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -29,7 +29,7 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest {
 
@@ -52,11 +52,11 @@ protected void cleanup() throws Exception {
     public void testSessionExpired() throws Exception {
         admin.clusters().createCluster("my-cluster", new ClusterData("test-url"));
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+        assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
         mockZookKeeper.failNow(Code.SESSIONEXPIRED);
 
-        assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+        assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
         try {
             admin.clusters().createCluster("my-cluster-2", new ClusterData("test-url"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 7c7cff4e9..83b15f5a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -167,7 +167,7 @@ public void testTlsSyncProducerAndConsumer(int batchMessageDelayMs) throws Excep
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -189,7 +189,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));
@@ -243,7 +243,7 @@ public void testAuthenticationFilterNegative() throws Exception {
         // this will cause NPE and it should throw 500
         doReturn(null).when(pulsar).getGlobalZkCache();
         try {
-            admin.clusters().createCluster(cluster, clusterData);
+            admin.clusters().updateCluster(cluster, clusterData);
         } catch (PulsarAdminException e) {
             Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
         }
@@ -268,7 +268,7 @@ public void testInternalServerExceptionOnLookup() throws Exception {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -292,4 +292,4 @@ public void testInternalServerExceptionOnLookup() throws Exception {
 
     }
 
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 4a8d8cf4c..0d1b8c3d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -73,6 +73,7 @@
 
 public class NonPersistentTopicTest extends ProducerConsumerBase {
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
+    private final String configClusterName = "r1";
 
     @DataProvider(name = "subscriptionType")
     public Object[][] getSubscriptionType() {
@@ -835,7 +836,7 @@ void setupReplicationCluster() throws Exception {
             // completely
             // independent config objects instead of referring to the same properties object
             ServiceConfiguration config1 = new ServiceConfiguration();
-            config1.setClusterName("r1");
+            config1.setClusterName(configClusterName);
             config1.setWebServicePort(webServicePort1);
             config1.setZookeeperServers("127.0.0.1:" + zkPort1);
             config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
@@ -901,11 +902,11 @@ void setupReplicationCluster() throws Exception {
             admin3 = new PulsarAdmin(url3, (Authentication) null);
 
             // Provision the global namespace
-            admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
-            admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
+            admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
                     pulsar1.getBrokerServiceUrlTls()));
 
             admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index b8c1cde6a..7b8e083f2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -40,6 +40,7 @@
     protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
     protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
     protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String clusterName = "use";
 
     @BeforeMethod
     @Override
@@ -62,7 +63,7 @@ protected void internalSetUpForBroker() throws Exception {
         conf.setTlsEnabled(true);
         conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        conf.setClusterName("use");
+        conf.setClusterName(clusterName);
     }
 
     protected void internalSetUpForClient() throws Exception {
@@ -78,7 +79,7 @@ protected void internalSetUpForNamespace() throws Exception {
         clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         clientConf.setUseTls(true);
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        admin.clusters().createCluster("use",
+        admin.clusters().updateCluster(clusterName,
                 new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT,
                         "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index 3bac50422..8398ea2ec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -76,7 +76,7 @@ protected void cleanup() throws Exception {
      * @throws Exception
      */
     @Test
-    public void testRiderectUrlWithServerStarted() throws Exception {
+    public void testRedirectUrlWithServerStarted() throws Exception {
         // 1. start server
         int port = PortManager.nextFreePort();
         ServiceConfig config = new ServiceConfig();
@@ -100,7 +100,7 @@ public void testRiderectUrlWithServerStarted() throws Exception {
          **/
 
         assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
-                "Property does not exist");
+                "Cannot set replication on a non-global namespace");
         assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Property does not exist");
         assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Property does not exist");
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index c996c9f71..302f2e462 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -44,7 +44,8 @@
 
 public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
     private WebSocketService service;
-    private static final int TEST_PORT = PortManager.nextFreePort();;
+    private static final int TEST_PORT = PortManager.nextFreePort();
+    private final String configClusterName = "c1";
 
     public ProxyAuthorizationTest() {
         super();
@@ -53,7 +54,7 @@ public ProxyAuthorizationTest() {
     @BeforeClass
     @Override
     protected void setup() throws Exception {
-        conf.setClusterName("c1");
+        conf.setClusterName(configClusterName);
         internalSetup();
 
         WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
@@ -81,7 +82,7 @@ public void test() throws Exception {
 
         assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
 
-        admin.clusters().createCluster("c1", new ClusterData());
+        admin.clusters().updateCluster(configClusterName, new ClusterData());
         admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
         waitForChange();
         admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 1984e2a25..779e7b117 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -27,6 +27,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
 /**
@@ -284,4 +285,135 @@ void updateNamespaceIsolationPolicy(String cluster, String policyName, Namespace
      */
     NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
 
+    /**
+     * Create a domain into cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param FailureDomain
+     *          Domain configurations
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws ConflictException
+     *             Broker already exist into other domain
+     *             
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void createFailureDomain(String cluster, String domainName, FailureDomain domain)
+            throws PulsarAdminException;
+    
+    
+    /**
+     * Update a domain into cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          domain name
+     *
+     * @param FailureDomain
+     *          Domain configurations
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws ConflictException
+     *             Broker already exist into other domain
+     *             
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
+            throws PulsarAdminException;
+    
+    
+    /**
+     * Delete a domain in cluster
+     * <p>
+     *
+     * @param cluster
+     *          Cluster name
+     *
+     * @param domainName
+     *          Domain name
+     *
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+
+    void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+
+    /**
+     * Get all registered domains in cluster
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Cluster don't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException;
+    
+    /**
+     * Get the domain registered into a cluster
+     * <p>
+     *
+     * @param cluster
+     *            Cluster name
+     * @return
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to create the cluster
+     *
+     * @throws NotFoundException
+     *             Domain doesn't exist
+     *
+     * @throws PreconditionFailedException
+     *             Cluster doesn't exist
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+    
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index ebe754c8e..3c8d3d51e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -420,6 +420,78 @@
      *             Unexpected error
      */
     void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException;
+    
+    
+    /**
+     * Set anti-affinity group name for a namespace
+     * <p>
+     * Request example:
+     *
+     * @param namespace
+     *            Namespace name
+     * @param namespaceAntiAffinityGroup
+     *            anti-affinity group name for a namespace
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException;
+    
+    /**
+     * Get all namespaces that grouped with given anti-affinity group
+     * 
+     * @param property
+     *            property is only used for authorization. Client has to be admin of any of the property to access this
+     *            api api.
+     * @param cluster
+     *            cluster name
+     * @param namespaceAntiAffinityGroup
+     *            Anti-affinity group name
+     * @return list of namespace grouped under a given anti-affinity group
+     * @throws PulsarAdminException
+     */
+    List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException;
+
+    /**
+     * Get anti-affinity group name for a namespace
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
+
+    /**
+     * Delete anti-affinity group name for a namespace.
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
 
     /**
      * Set the deduplication status for all topics within a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 2fcf44a29..666a2e8b0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
@@ -150,4 +151,49 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String
             throw getApiException(e);
         }
     }
+
+    @Override
+    public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+        setDomain(cluster, domainName, domain);
+    }
+
+    @Override
+    public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+        setDomain(cluster, domainName, domain);
+    }
+
+    @Override
+    public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+        request(clusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
+    }
+
+    @Override
+    public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
+        try {
+            return request(clusters.path(cluster).path("failureDomains"))
+                    .get(new GenericType<Map<String, FailureDomain>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+        try {
+            return request(clusters.path(cluster).path("failureDomains").path(domainName)).get(FailureDomain.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+    
+    private void setDomain(String cluster, String domainName,
+            FailureDomain domain) throws PulsarAdminException {
+        try {
+            request(clusters.path(cluster).path("failureDomains").path(domainName)).post(
+                    Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 79aa162c4..3f0c7e71c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -228,6 +228,55 @@ public void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws Pu
         }
     }
 
+    @Override
+    public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON),
+                            ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).get(new GenericType<String>() {
+                    });
+
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+            throws PulsarAdminException {
+        try {
+            return request(namespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup)
+                    .queryParam("property", property)).get(new GenericType<List<String>>() {
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+                    .path("antiAffinity")).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     @Override
     public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
         try {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 84bced071..5ba6c1077 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -21,9 +21,11 @@
 import java.util.Arrays;
 
 import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -125,6 +127,84 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Create a new failure-domain for a cluster. updates it if already created.")
+    private class CreateFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+        private String brokerList;
+        
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            FailureDomain domain = new FailureDomain();
+            domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")): null));
+            admin.clusters().createFailureDomain(cluster, domainName, domain);
+        }
+    }
+
+    @Parameters(commandDescription = "Update failure-domain for a cluster. Creates a new one if not exist.")
+    private class UpdateFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+        private String brokerList;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            FailureDomain domain = new FailureDomain();
+            domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")) : null));
+            admin.clusters().updateFailureDomain(cluster, domainName, domain);
+        }
+    }
+    
+    @Parameters(commandDescription = "Deletes an existing failure-domain")
+    private class DeleteFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            admin.clusters().deleteFailureDomain(cluster, domainName);
+        }
+    }
+
+    @Parameters(commandDescription = "List the existing failure-domains for a cluster")
+    private class ListFailureDomains extends CliCommand {
+        
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+        
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            print(admin.clusters().getFailureDomains(cluster));
+        }
+    }
+
+    @Parameters(commandDescription = "Get the configuration brokers of a failure-domain")
+    private class GetFailureDomain extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private java.util.List<String> params;
+        
+        @Parameter(names = "--domain-name", description = "domain-name", required = true)
+        private String domainName;
+
+        void run() throws PulsarAdminException {
+            String cluster = getOneArgument(params);
+            print(admin.clusters().getFailureDomain(cluster, domainName));
+        }
+    }
+    
     public CmdClusters(PulsarAdmin admin) {
         super("clusters", admin);
         jcommander.addCommand("get", new Get());
@@ -133,6 +213,11 @@ public CmdClusters(PulsarAdmin admin) {
         jcommander.addCommand("delete", new Delete());
         jcommander.addCommand("list", new List());
         jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters());
+        jcommander.addCommand("get-failure-domain", new GetFailureDomain());
+        jcommander.addCommand("create-failure-domain", new CreateFailureDomain());
+        jcommander.addCommand("update-failure-domain", new UpdateFailureDomain());
+        jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain());
+        jcommander.addCommand("list-failure-domains", new ListFailureDomains());
     }
 
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 37d7306f9..e04eef7be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -213,6 +213,65 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Set Anti-affinity group name for a namspace")
+    private class SetAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+        private String antiAffinityGroup;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+        }
+    }
+
+    @Parameters(commandDescription = "Get Anti-affinity group name for a namspace")
+    private class GetAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+        
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getNamespaceAntiAffinityGroup(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Get Anti-affinity namespaces grouped with the given anti-affinity group name")
+    private class GetAntiAffinityNamespaces extends CliCommand {
+
+        @Parameter(names = { "--property",
+                "-p" }, description = "property is only used for authorization. Client has to be admin of any of the property to access this api", required = false)
+        private String property;
+
+        @Parameter(names = { "--cluster", "-c" }, description = "Cluster name", required = true)
+        private String cluster;
+
+        @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+        private String antiAffinityGroup;
+
+        @Override
+        void run() throws PulsarAdminException {
+            print(admin.namespaces().getAntiAffinityNamespaces(property, cluster, antiAffinityGroup));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove Anti-affinity group name for a namspace")
+    private class DeleteAntiAffinityGroup extends CliCommand {
+        @Parameter(description = "property/cluster/namespace\n", required = true)
+        private java.util.List<String> params;
+        
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+        }
+    }
+    
+
     @Parameters(commandDescription = "Enable or disable deduplication for a namespace")
     private class SetDeduplication extends CliCommand {
         @Parameter(description = "property/cluster/namespace", required = true)
@@ -621,6 +680,11 @@ public CmdNamespaces(PulsarAdmin admin) {
 
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+        
+        jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup());
+        jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
+        jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces());
+        jcommander.addCommand("delete-anti-affinity-group", new DeleteAntiAffinityGroup());
 
         jcommander.addCommand("set-deduplication", new SetDeduplication());
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 69d57da74..ba12db7e8 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -131,6 +132,24 @@ void clusters() throws Exception {
 
         clusters.run(split("delete use"));
         verify(mockClusters).deleteCluster("use");
+        
+        clusters.run(split("list-failure-domains use"));
+        verify(mockClusters).getFailureDomains("use");
+
+        clusters.run(split("get-failure-domain use --domain-name domain"));
+        verify(mockClusters).getFailureDomain("use", "domain");
+
+        clusters.run(split("create-failure-domain use --domain-name domain --broker-list b1"));
+        FailureDomain domain = new FailureDomain();
+        domain.setBrokers(Sets.newHashSet("b1"));
+        verify(mockClusters).createFailureDomain("use", "domain", domain);
+
+        clusters.run(split("update-failure-domain use --domain-name domain --broker-list b1"));
+        verify(mockClusters).updateFailureDomain("use", "domain", domain);
+
+        clusters.run(split("delete-failure-domain use --domain-name domain"));
+        verify(mockClusters).deleteFailureDomain("use", "domain");
+
 
         // Re-create CmdClusters to avoid a issue.
         // See https://github.com/cbeust/jcommander/issues/271
@@ -284,6 +303,19 @@ void namespaces() throws Exception {
 
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
+        
+        namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group"));
+        verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group");
+
+        namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
+        verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
+        
+        namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group"));
+        verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group");
+
+        namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
+        verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
+        
 
         namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
         verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
new file mode 100644
index 000000000..dd1d09b8a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+public class FailureDomain {
+
+    public Set<String> brokers = new HashSet<String>();
+
+    public Set<String> getBrokers() {
+        return brokers;
+    }
+
+    public void setBrokers(Set<String> brokers) {
+        this.brokers = brokers;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof FailureDomain) {
+            FailureDomain other = (FailureDomain) obj;
+            return Objects.equal(brokers, other.brokers);
+        }
+
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).add("brokers", brokers).toString();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a46b5fee0..f3486b859 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -42,6 +42,7 @@
     public int message_ttl_in_seconds = 0;
     public RetentionPolicies retention_policies = null;
     public boolean deleted = false;
+    public String antiAffinityGroup;
 
     public static final String FIRST_BOUNDARY = "0x00000000";
     public static final String LAST_BOUNDARY = "0xffffffff";
@@ -63,7 +64,8 @@ public boolean equals(Object obj) {
                     && message_ttl_in_seconds == other.message_ttl_in_seconds
                     && Objects.equals(retention_policies, other.retention_policies)
                     && Objects.equals(encryption_required, other.encryption_required)
-                    && Objects.equals(subscription_auth_mode, other.subscription_auth_mode);
+                    && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
+                    && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
         }
 
         return false;
@@ -86,6 +88,7 @@ public String toString() {
                 .add("deduplicationEnabled", deduplicationEnabled)
                 .add("clusterDispatchRate", clusterDispatchRate)
                 .add("latency_stats_sample_rate", latency_stats_sample_rate)
+                .add("antiAffinityGroup", antiAffinityGroup)
                 .add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
                 .add("deleted", deleted)
                 .add("encryption_required", encryption_required)
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 06bd0557e..556a8b548 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -64,6 +64,7 @@
 
     private ProxyService proxyService;
     private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+    private final String configClusterName = "use";
 
     @BeforeMethod
     @Override
@@ -92,7 +93,7 @@ protected void setup() throws Exception {
         providers.add(AuthenticationProviderTls.class.getName());
         conf.setAuthenticationProviders(providers);
 
-        conf.setClusterName("use");
+        conf.setClusterName(configClusterName);
 
         super.init();
 
@@ -157,7 +158,7 @@ public void testTlsSyncProducerAndConsumer() throws Exception {
         // create a client which connects to proxy over tls and pass authData
         PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
 
-        admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+        admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
                 "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services