You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2018/01/25 02:00:00 UTC
[incubator-pulsar] branch master updated: PIP-7 Introduce
Failure-domain and Anti-affinity-namespace group (#896)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb9834 PIP-7 Introduce Failure-domain and Anti-affinity-namespace group (#896)
6bb9834 is described below
commit 6bb98344ca48f81aaf403372400cb38013616e9d
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Jan 24 17:59:58 2018 -0800
PIP-7 Introduce Failure-domain and Anti-affinity-namespace group (#896)
* Introduce Failure-domain and Anti-affinity-namespace group
---
conf/broker.conf | 3 +
conf/standalone.conf | 3 +
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +
.../broker/cache/ConfigurationCacheService.java | 50 ++
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../apache/pulsar/broker/admin/AdminResource.java | 24 +-
.../org/apache/pulsar/broker/admin/Clusters.java | 219 ++++++++-
.../org/apache/pulsar/broker/admin/Namespaces.java | 140 +++++-
.../broker/loadbalance/impl/LoadManagerShared.java | 242 ++++++++-
.../loadbalance/impl/ModularLoadManagerImpl.java | 104 +++-
.../loadbalance/impl/SimpleLoadManagerImpl.java | 2 +-
.../pulsar/broker/web/PulsarWebResource.java | 21 +-
.../apache/pulsar/broker/SLAMonitoringTest.java | 2 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 16 +-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 71 ++-
.../org/apache/pulsar/broker/admin/AdminTest.java | 11 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 2 +-
.../pulsar/broker/auth/AuthorizationTest.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 4 +-
.../AntiAffinityNamespaceGroupTest.java | 540 +++++++++++++++++++++
.../loadbalance/ModularLoadManagerImplTest.java | 14 +-
.../broker/service/BacklogQuotaManagerTest.java | 2 +-
.../broker/service/BrokerBkEnsemblesTests.java | 2 +-
.../pulsar/broker/service/ReplicatorTestBase.java | 6 +-
.../ZooKeeperSessionExpireRecoveryTest.java | 8 +-
.../api/AuthenticatedProducerConsumerTest.java | 10 +-
.../pulsar/client/api/NonPersistentTopicTest.java | 9 +-
.../pulsar/client/api/TlsProducerConsumerBase.java | 5 +-
.../service/web/DiscoveryServiceWebTest.java | 4 +-
.../websocket/proxy/ProxyAuthorizationTest.java | 7 +-
.../org/apache/pulsar/client/admin/Clusters.java | 132 +++++
.../org/apache/pulsar/client/admin/Namespaces.java | 72 +++
.../pulsar/client/admin/internal/ClustersImpl.java | 46 ++
.../client/admin/internal/NamespacesImpl.java | 49 ++
.../org/apache/pulsar/admin/cli/CmdClusters.java | 85 ++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 64 +++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 32 ++
.../pulsar/common/policies/data/FailureDomain.java | 52 ++
.../pulsar/common/policies/data/Policies.java | 5 +-
.../ProxyAuthenticatedProducerConsumerTest.java | 5 +-
40 files changed, 1975 insertions(+), 103 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index d983b3f..de8ab1a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -46,6 +46,9 @@ advertisedAddress=
# Name of the cluster to which this broker belongs to
clusterName=
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ec400fb..52b3174 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -39,6 +39,9 @@ advertisedAddress=
# Name of the cluster to which this broker belongs to
clusterName=standalone
+# Enable cluster's failure-domain which can distribute brokers into logical region
+failureDomainsEnabled=false
+
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2585f33..7f01a87 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -61,6 +61,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Name of the cluster to which this broker belongs to
@FieldContext(required = true)
private String clusterName;
+ // Enable cluster's failure-domain which can distribute brokers into logical region
+ @FieldContext(dynamic = true)
+ private boolean failureDomainsEnabled = false;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
// Time to wait for broker graceful shutdown. After this time elapses, the
@@ -472,6 +475,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.clusterName = clusterName;
}
+ public boolean isFailureDomainsEnabled() {
+ return failureDomainsEnabled;
+ }
+
+ public void setFailureDomainsEnabled(boolean failureDomainsEnabled) {
+ this.failureDomainsEnabled = failureDomainsEnabled;
+ }
+
public long getBrokerShutdownTimeoutMs() {
return brokerShutdownTimeoutMs;
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 1c8ad80..41a8f93 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -21,8 +21,10 @@ package org.apache.pulsar.broker.cache;
import java.util.Map;
import org.apache.bookkeeper.util.ZkUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -39,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Maps;
/**
* ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
@@ -53,13 +56,21 @@ public class ConfigurationCacheService {
private ZooKeeperDataCache<Policies> policiesCache;
private ZooKeeperDataCache<ClusterData> clustersCache;
private ZooKeeperChildrenCache clustersListCache;
+ private ZooKeeperChildrenCache failureDomainListCache;
private ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache;
+ private ZooKeeperDataCache<FailureDomain> failureDomainCache;
public static final String POLICIES = "policies";
+ public static final String FAILURE_DOMAIN = "failureDomain";
+ public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
+ this(cache, null);
+ }
+
+ public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
this.cache = cache;
initZK();
@@ -86,6 +97,12 @@ public class ConfigurationCacheService {
};
this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);
+
+ CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
+ if (isNotBlank(configuredClusterName)) {
+ createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
+ this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT);
+ }
this.namespaceIsolationPoliciesCache = new ZooKeeperDataCache<NamespaceIsolationPolicies>(cache) {
@Override
@@ -96,6 +113,31 @@ public class ConfigurationCacheService {
}));
}
};
+
+ this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
+ @Override
+ public FailureDomain deserialize(String path, byte[] content) throws Exception {
+ return ObjectMapperFactory.getThreadLocal().readValue(content, FailureDomain.class);
+ }
+ };
+ }
+
+ private void createFailureDomainRoot(ZooKeeper zk, String path) {
+ try {
+ if (zk.exists(path, false) == null) {
+ try {
+ byte[] data = "".getBytes();
+ ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.info("Successfully created failure-domain znode at {}", path);
+ } catch (KeeperException.NodeExistsException e) {
+ // Ok
+ }
+ }
+ } catch (KeeperException.NodeExistsException e) {
+ // Ok
+ } catch (Exception e) {
+ LOG.warn("Failed to create failure-domain znode {} ", path, e);
+ }
}
private void initZK() throws PulsarServerException {
@@ -138,6 +180,10 @@ public class ConfigurationCacheService {
return this.clustersListCache;
}
+ public ZooKeeperChildrenCache failureDomainListCache() {
+ return this.failureDomainListCache;
+ }
+
public ZooKeeper getZooKeeper() {
return this.cache.getZooKeeper();
}
@@ -145,4 +191,8 @@ public class ConfigurationCacheService {
public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
return this.namespaceIsolationPoliciesCache;
}
+
+ public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+ return this.failureDomainCache;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 2d7f99b..6cd56ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -417,7 +417,7 @@ public class PulsarService implements AutoCloseable {
throw new PulsarServerException(e);
}
- this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
+ this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache(), this.config.getClusterName());
this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f542a10..f04d15b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import java.net.MalformedURLException;
import java.net.URI;
@@ -45,6 +47,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
@@ -64,7 +67,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
@@ -202,6 +204,7 @@ public abstract class AdminResource extends PulsarWebResource {
return namespaces;
}
+
/**
* Redirect the call to the specified broker
*
@@ -286,6 +289,14 @@ public abstract class AdminResource extends PulsarWebResource {
return pulsar().getConfigurationCache().namespaceIsolationPoliciesCache();
}
+ protected ZooKeeperDataCache<FailureDomain> failureDomainCache() {
+ return pulsar().getConfigurationCache().failureDomainCache();
+ }
+
+ protected ZooKeeperChildrenCache failureDomainListCache() {
+ return pulsar().getConfigurationCache().failureDomainListCache();
+ }
+
protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
@@ -354,5 +365,14 @@ public abstract class AdminResource extends PulsarWebResource {
}
return metadataFuture;
}
-
+
+ protected void validateClusterExists(String cluster) {
+ try {
+ if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
+ }
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
index b489f5f..612e2e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Clusters.java
@@ -23,9 +23,11 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
@@ -38,14 +40,17 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -53,12 +58,15 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.FAILURE_DOMAIN;
+
@Path("/clusters")
@Api(value = "/clusters", description = "Cluster admin apis", tags = "clusters")
@Produces(MediaType.APPLICATION_JSON)
@@ -138,9 +146,14 @@ public class Clusters extends AdminResource {
String clusterPath = path("clusters", cluster);
Stat nodeStat = new Stat();
byte[] content = globalZk().getData(clusterPath, null, nodeStat);
- ClusterData currentClusterData = jsonMapper().readValue(content, ClusterData.class);
- // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
- currentClusterData.update(clusterData);
+ ClusterData currentClusterData = null;
+ if (content.length > 0) {
+ currentClusterData = jsonMapper().readValue(content, ClusterData.class);
+ // only update cluster-url-data and not overwrite other metadata such as peerClusterNames
+ currentClusterData.update(clusterData);
+ } else {
+ currentClusterData = clusterData;
+ }
// Write back the new updated ClusterData into zookeeper
globalZk().setData(clusterPath, jsonMapper().writeValueAsBytes(currentClusterData),
nodeStat.getVersion());
@@ -259,6 +272,7 @@ public class Clusters extends AdminResource {
try {
String clusterPath = path("clusters", cluster);
+ deleteFailureDomain(clusterPath);
globalZk().delete(clusterPath, -1);
globalZkCache().invalidate(clusterPath);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
@@ -271,6 +285,25 @@ public class Clusters extends AdminResource {
}
}
+ private void deleteFailureDomain(String clusterPath) {
+ try {
+ String failureDomain = joinPath(clusterPath, ConfigurationCacheService.FAILURE_DOMAIN);
+ if (globalZk().exists(failureDomain, false) == null) {
+ return;
+ }
+ for (String domain : globalZk().getChildren(failureDomain, false)) {
+ String domainPath = joinPath(failureDomain, domain);
+ globalZk().delete(domainPath, -1);
+ }
+ globalZk().delete(failureDomain, -1);
+ failureDomainCache().clear();
+ failureDomainListCache().clear();
+ } catch (Exception e) {
+ log.warn("Failed to delete failure-domain under cluster {}", clusterPath);
+ throw new RestException(e);
+ }
+ }
+
@GET
@Path("/{cluster}/namespaceIsolationPolicies")
@ApiOperation(value = "Get the namespace isolation policies assigned in the cluster", response = NamespaceIsolationData.class, responseContainer = "Map")
@@ -296,16 +329,6 @@ public class Clusters extends AdminResource {
}
}
- private void validateClusterExists(String cluster) {
- try {
- if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
- throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
- }
- } catch (Exception e) {
- throw new RestException(e);
- }
- }
-
@GET
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(value = "Get a single namespace isolation policy assigned in the cluster", response = NamespaceIsolationData.class)
@@ -356,7 +379,7 @@ public class Clusters extends AdminResource {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
- this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+ this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
return new NamespaceIsolationPolicies();
} catch (KeeperException | InterruptedException e) {
throw new RestException(e);
@@ -386,23 +409,26 @@ public class Clusters extends AdminResource {
}
}
- private void createNamespaceIsolationPolicyNode(String nsIsolationPolicyPath)
- throws KeeperException, InterruptedException {
+ private boolean createZnodeIfNotExist(String path, Optional<Object> value) throws KeeperException, InterruptedException {
// create persistent node on ZooKeeper
- if (globalZk().exists(nsIsolationPolicyPath, false) == null) {
+ if (globalZk().exists(path, false) == null) {
// create all the intermediate nodes
try {
- ZkUtils.createFullPathOptimistic(globalZk(), nsIsolationPolicyPath,
- jsonMapper().writeValueAsBytes(Collections.emptyMap()), Ids.OPEN_ACL_UNSAFE,
+ ZkUtils.createFullPathOptimistic(globalZk(), path,
+ value.isPresent() ? jsonMapper().writeValueAsBytes(value.get()) : null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ return true;
} catch (KeeperException.NodeExistsException nee) {
- log.debug("Other broker preempted the full path [{}] already. Continue...", nsIsolationPolicyPath);
+ if(log.isDebugEnabled()) {
+ log.debug("Other broker preempted the full path [{}] already. Continue...", path);
+ }
} catch (JsonGenerationException e) {
// ignore json error as it is empty hash
} catch (JsonMappingException e) {
} catch (IOException e) {
}
}
+ return false;
}
@DELETE
@@ -422,7 +448,7 @@ public class Clusters extends AdminResource {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
- this.createNamespaceIsolationPolicyNode(nsIsolationPolicyPath);
+ this.createZnodeIfNotExist(nsIsolationPolicyPath, Optional.of(Collections.emptyMap()));
return new NamespaceIsolationPolicies();
} catch (KeeperException | InterruptedException e) {
throw new RestException(e);
@@ -446,6 +472,157 @@ public class Clusters extends AdminResource {
}
}
+ @POST
+ @Path("/{cluster}/failureDomains/{domainName}")
+ @ApiOperation(value = "Set cluster's failure Domain")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 409, message = "Broker already exist into other domain"),
+ @ApiResponse(code = 404, message = "Cluster doesn't exist") })
+ public void setFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName,
+ FailureDomain domain) throws Exception {
+ validateSuperUserAccess();
+ validateClusterExists(cluster);
+ validateBrokerExistsInOtherDomain(cluster, domainName, domain);
+
+ try {
+ String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+ if (this.createZnodeIfNotExist(domainPath, Optional.ofNullable(domain))) {
+ // clear domains-children cache
+ this.failureDomainListCache().clear();
+ } else {
+ globalZk().setData(domainPath, jsonMapper().writeValueAsBytes(domain), -1);
+ // make sure that the domain-cache will be refreshed for the next read access
+ failureDomainCache().invalidate(domainPath);
+ }
+ } catch (KeeperException.NoNodeException nne) {
+ log.warn("[{}] Failed to update domain {}. clusters {} Does not exist", clientAppId(), cluster,
+ domainName);
+ throw new RestException(Status.NOT_FOUND,
+ "Domain " + domainName + " for cluster " + cluster + " does not exist");
+ } catch (Exception e) {
+ log.error("[{}] Failed to update clusters/{}/domainName/{}", clientAppId(), cluster, domainName, e);
+ throw new RestException(e);
+ }
+ }
+
+ @GET
+ @Path("/{cluster}/failureDomains")
+ @ApiOperation(value = "Get the cluster failure domains", response = FailureDomain.class, responseContainer = "Map")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
+ public Map<String, FailureDomain> getFailureDomains(@PathParam("cluster") String cluster) throws Exception {
+ validateSuperUserAccess();
+
+ Map<String, FailureDomain> domains = Maps.newHashMap();
+ try {
+ final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+ for (String domainName : failureDomainListCache().get()) {
+ try {
+ Optional<FailureDomain> domain = failureDomainCache()
+ .get(joinPath(failureDomainRootPath, domainName));
+ if (domain.isPresent()) {
+ domains.put(domainName, domain.get());
+ }
+ } catch (Exception e) {
+ log.warn("Failed to get domain {}", domainName, e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failure-domain is not configured for cluster {}", clientAppId(), cluster, e);
+ return Collections.emptyMap();
+ } catch (Exception e) {
+ log.error("[{}] Failed to get failure-domains for cluster {}", clientAppId(), cluster, e);
+ throw new RestException(e);
+ }
+ return domains;
+ }
+
+ @GET
+ @Path("/{cluster}/failureDomains/{domainName}")
+ @ApiOperation(value = "Get a domain in a cluster", response = FailureDomain.class)
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Domain doesn't exist"),
+ @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+ public FailureDomain getDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+ throws Exception {
+ validateSuperUserAccess();
+ validateClusterExists(cluster);
+
+ try {
+ final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+ return failureDomainCache().get(joinPath(failureDomainRootPath, domainName))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ "Domain " + domainName + " for cluster " + cluster + " does not exist"));
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get domain {} for cluster {}", clientAppId(), domainName, cluster, e);
+ throw new RestException(e);
+ }
+ }
+
+ @DELETE
+ @Path("/{cluster}/failureDomains/{domainName}")
+ @ApiOperation(value = "Delete cluster's failure omain")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission or plicy is read only"),
+ @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+ public void deleteFailureDomain(@PathParam("cluster") String cluster, @PathParam("domainName") String domainName)
+ throws Exception {
+ validateSuperUserAccess();
+ validateClusterExists(cluster);
+
+ try {
+ final String domainPath = joinPath(pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT, domainName);
+ globalZk().delete(domainPath, -1);
+ // clear domain cache
+ failureDomainCache().invalidate(domainPath);
+ failureDomainListCache().clear();
+ } catch (KeeperException.NoNodeException nne) {
+ log.warn("[{}] Domain {} does not exist in {}", clientAppId(), domainName, cluster);
+ throw new RestException(Status.NOT_FOUND,
+ "Domain-name " + domainName + " or cluster " + cluster + " does not exist");
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete domain {} in cluster {}", clientAppId(), domainName, cluster, e);
+ throw new RestException(e);
+ }
+ }
+
+ private void validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName,
+ final FailureDomain inputDomain) {
+ if (inputDomain != null && inputDomain.brokers != null) {
+ try {
+ final String failureDomainRootPath = pulsar().getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+ for (String domainName : failureDomainListCache().get()) {
+ if (inputDomainName.equals(domainName)) {
+ continue;
+ }
+ try {
+ Optional<FailureDomain> domain = failureDomainCache().get(joinPath(failureDomainRootPath, domainName));
+ if (domain.isPresent() && domain.get().brokers != null) {
+ List<String> duplicateBrokers = domain.get().brokers.stream().parallel()
+ .filter(inputDomain.brokers::contains).collect(Collectors.toList());
+ if (!duplicateBrokers.isEmpty()) {
+ throw new RestException(Status.CONFLICT,
+ duplicateBrokers + " already exist into " + domainName);
+ }
+ }
+ } catch (Exception e) {
+ if (e instanceof RestException) {
+ throw e;
+ }
+ log.warn("Failed to get domain {}", domainName, e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Domain is not configured for cluster", clientAppId(), e);
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to get domains for cluster {}", clientAppId(), e);
+ throw new RestException(e);
+ }
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(Clusters.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
index cb8cf72..cb8f982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/Namespaces.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.admin;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import java.net.URI;
@@ -91,8 +94,6 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-
@Path("/namespaces")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@@ -669,6 +670,139 @@ public class Namespaces extends AdminResource {
}
@POST
+ @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+ @ApiOperation(value = "Set anti-affinity group for a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 412, message = "Invalid antiAffinityGroup") })
+ public void setNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace, String antiAffinityGroup) {
+ validateAdminAccessOnProperty(property);
+ validatePoliciesReadOnlyAccess();
+
+ log.info("[{}] Setting anti-affinity group {} for {}/{}/{}", clientAppId(), antiAffinityGroup, property,
+ cluster, namespace);
+
+ if (isBlank(antiAffinityGroup)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
+ }
+
+ NamespaceName nsName = NamespaceName.get(property, cluster, namespace);
+ Entry<Policies, Stat> policiesNode = null;
+
+ try {
+ // Force to read the data s.t. the watch to the cache content is setup.
+ policiesNode = policiesCache().getWithStat(path(POLICIES, property, cluster, namespace))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace " + nsName + " does not exist"));
+ policiesNode.getKey().antiAffinityGroup = antiAffinityGroup;
+
+ // Write back the new policies into zookeeper
+ globalZk().setData(path(POLICIES, property, cluster, namespace),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+ policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+
+ log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}/{}/{}", clientAppId(),
+ antiAffinityGroup, property, cluster, namespace);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}/{}/{}: does not exist", clientAppId(),
+ property, cluster, namespace);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn(
+ "[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{} expected policy node version={} : concurrent modification",
+ clientAppId(), property, cluster, namespace, policiesNode.getValue().getVersion());
+
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (Exception e) {
+ log.error("[{}] Failed to update the antiAffinityGroup on namespace {}/{}/{}", clientAppId(), property, cluster,
+ namespace, e);
+ throw new RestException(e);
+ }
+ }
+
+ @GET
+ @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+ @ApiOperation(value = "Get anti-affinity group of a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
+ public String getNamespaceAntiAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
+ validateAdminAccessOnProperty(property);
+ return getNamespacePolicies(property, cluster, namespace).antiAffinityGroup;
+ }
+
+ @GET
+ @Path("{cluster}/antiAffinity/{group}")
+ @ApiOperation(value = "Get all namespaces that are grouped by given anti-affinity group in a given cluster. api can be only accessed by admin of any of the existing property")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 412, message = "Cluster not exist/Anti-affinity group can't be empty.") })
+ public List<String> getAntiAffinityNamespaces(@PathParam("cluster") String cluster,
+ @PathParam("group") String antiAffinityGroup, @QueryParam("property") String property) {
+ validateAdminAccessOnProperty(property);
+
+ log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), property, antiAffinityGroup, cluster);
+
+ if (isBlank(antiAffinityGroup)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty.");
+ }
+ validateClusterExists(cluster);
+ List<String> namespaces = Lists.newArrayList();
+ try {
+ for (String prop : globalZk().getChildren(POLICIES_ROOT, false)) {
+ for (String namespace : globalZk().getChildren(path(POLICIES, prop, cluster), false)) {
+ Optional<Policies> policies = policiesCache()
+ .get(AdminResource.path(POLICIES, prop, cluster, namespace));
+ if (policies.isPresent() && antiAffinityGroup.equalsIgnoreCase(policies.get().antiAffinityGroup)) {
+ namespaces.add(String.format("%s/%s/%s", prop, cluster, namespace));
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to list of properties/namespace from global-zk", e);
+ }
+ return namespaces;
+ }
+
+ @DELETE
+ @Path("/{property}/{cluster}/{namespace}/antiAffinity")
+ @ApiOperation(value = "Remove anti-affinity group of a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ validateAdminAccessOnProperty(property);
+ validatePoliciesReadOnlyAccess();
+
+ log.info("[{}] Deleting anti-affinity group for {}/{}/{}", clientAppId(), property, cluster, namespace);
+
+ try {
+ Stat nodeStat = new Stat();
+ final String path = path(POLICIES, property, cluster, namespace);
+ byte[] content = globalZk().getData(path, null, nodeStat);
+ Policies policies = jsonMapper().readValue(content, Policies.class);
+ policies.antiAffinityGroup = null;
+ globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
+ policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
+ log.info("[{}] Successfully removed anti-affinity group for a namespace={}/{}/{}", clientAppId(), property,
+ cluster, namespace);
+
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: does not exist", clientAppId(),
+ property, cluster, namespace);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}: concurrent modification",
+ clientAppId(), property, cluster, namespace);
+ throw new RestException(Status.CONFLICT, "Concurrent modification");
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove anti-affinity group for namespace {}/{}/{}", clientAppId(), property,
+ cluster, namespace, e);
+ throw new RestException(e);
+ }
+ }
+
+ @POST
@Path("/{property}/{cluster}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 1a2d636..ffdaac6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -19,29 +19,44 @@
package org.apache.pulsar.broker.loadbalance.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
import org.apache.pulsar.broker.BrokerData;
+import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.stats.metrics.JvmMetrics;
+import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Maps;
+
/**
* This class contains code which in shared between the two load manager implementations.
*/
@@ -60,13 +75,15 @@ public class LoadManagerShared {
// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
+ private static final String DEFAULT_DOMAIN = "default";
+
// Don't allow construction: static method namespace only.
private LoadManagerShared() {
}
// Determines the brokers available for the given service unit according to the given policies.
// The brokers are put into brokerCandidateCache.
- public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
+ public static synchronized void applyNamespacePolicies(final ServiceUnitId serviceUnit,
final SimpleResourceAllocationPolicies policies, final Set<String> brokerCandidateCache,
final Set<String> availableBrokers,
final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
@@ -264,6 +281,227 @@ public class LoadManagerShared {
}
}
+ /**
+ * It tries to filter out brokers which own namespace with same anti-affinity-group as given namespace. If all the
+ * domains own namespace with same anti-affinity group then it will try to keep brokers with domain that has least
+ * number of namespaces. It also tries to keep brokers which has least number of namespace with in domain.
+ * eg.
+ * <pre>
+ * Before:
+ * Domain-count Brokers-count
+ * ____________ ____________
+ * d1-3 b1-2,b2-1
+ * d2-3 b3-2,b4-1
+ * d3-4 b5-2,b6-2
+ *
+ * After filtering: "candidates" brokers
+ * Domain-count Brokers-count
+ * ____________ ____________
+ * d1-3 b2-1
+ * d2-3 b4-1
+ *
+ * "candidate" broker to own anti-affinity-namespace = b2 or b4
+ *
+ * </pre>
+ *
+ * @param pulsar
+ * @param assignedBundleName
+ * @param candidates
+ * @param brokerToNamespaceToBundleRange
+ */
+ public static void filterAntiAffinityGroupOwnedBrokers(final PulsarService pulsar, final String assignedBundleName,
+ final Set<String> candidates, final Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ Map<String, String> brokerToDomainMap) {
+ if (candidates.isEmpty()) {
+ return;
+ }
+ final String namespaceName = getNamespaceNameFromBundleName(assignedBundleName);
+ try {
+ final Map<String, Integer> brokerToAntiAffinityNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar,
+ namespaceName, brokerToNamespaceToBundleRange).get(30, TimeUnit.SECONDS);
+ if (brokerToAntiAffinityNamespaceCount == null) {
+ // none of the broker owns anti-affinity-namespace so, none of the broker will be filtered
+ return;
+ }
+ if (pulsar.getConfiguration().isFailureDomainsEnabled()) {
+ // this will remove all the brokers which are part of domains that don't have least number of
+ // anti-affinity-namespaces
+ filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(brokerToAntiAffinityNamespaceCount, candidates,
+ brokerToDomainMap);
+ }
+ // now, "candidates" has list of brokers which are part of domain that can accept this namespace. now,
+ // with in these domains, remove brokers which don't have least number of namespaces. so, brokers with least
+ // number of namespace can be selected
+ int leastNamaespaceCount = Integer.MAX_VALUE;
+ for (final String broker : candidates) {
+ if (brokerToAntiAffinityNamespaceCount.containsKey(broker)) {
+ Integer namespaceCount = brokerToAntiAffinityNamespaceCount.get(broker);
+ if (namespaceCount == null) {
+ // Assume that when the namespace is absent, there are no namespace assigned to
+ // that broker.
+ leastNamaespaceCount = 0;
+ break;
+ }
+ leastNamaespaceCount = Math.min(leastNamaespaceCount, namespaceCount);
+ } else {
+ // Assume non-present brokers have 0 bundles.
+ leastNamaespaceCount = 0;
+ break;
+ }
+ }
+ // filter out broker based on namespace distribution
+ if (leastNamaespaceCount == 0) {
+ candidates.removeIf(broker -> brokerToAntiAffinityNamespaceCount.containsKey(broker)
+ && brokerToAntiAffinityNamespaceCount.get(broker) > 0);
+ } else {
+ final int finalLeastNamespaceCount = leastNamaespaceCount;
+ candidates
+ .removeIf(broker -> brokerToAntiAffinityNamespaceCount.get(broker) != finalLeastNamespaceCount);
+ }
+ } catch (Exception e) {
+ log.error("Failed to filter anti-affinity group namespace {}", e.getMessage());
+ }
+ }
+
+ /**
+ * It computes least number of namespace owned by any of the domain and then it filters out all the domains that own
+ * namespaces more than this count.
+ *
+ * @param brokerToAntiAffinityNamespaceCount
+ * @param candidates
+ * @param brokerToDomainMap
+ */
+ private static void filterDomainsNotHavingLeastNumberAntiAffinityNamespaces(
+ Map<String, Integer> brokerToAntiAffinityNamespaceCount, Set<String> candidates,
+ Map<String, String> brokerToDomainMap) {
+
+ if (brokerToDomainMap == null || brokerToDomainMap.isEmpty()) {
+ return;
+ }
+
+ final Map<String, Integer> domainNamespaceCount = Maps.newHashMap();
+ int leastNamespaceCount = Integer.MAX_VALUE;
+ candidates.forEach(broker -> {
+ final String domain = brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN);
+ final int count = brokerToAntiAffinityNamespaceCount.getOrDefault(broker, 0);
+ domainNamespaceCount.compute(domain, (domainName, nsCount) -> nsCount == null ? count : nsCount + count);
+ });
+ // find leastNameSpaceCount
+ for (Entry<String, Integer> domainNsCountEntry : domainNamespaceCount.entrySet()) {
+ if (domainNsCountEntry.getValue() < leastNamespaceCount) {
+ leastNamespaceCount = domainNsCountEntry.getValue();
+ }
+ }
+ final int finalLeastNamespaceCount = leastNamespaceCount;
+ // only keep domain brokers which has leastNamespaceCount
+ candidates.removeIf(broker -> {
+ Integer nsCount = domainNamespaceCount.get(brokerToDomainMap.getOrDefault(broker, DEFAULT_DOMAIN));
+ return nsCount != null && nsCount != finalLeastNamespaceCount;
+ });
+ }
+
+ /**
+ * It returns map of broker and count of namespace that are belong to the same anti-affinity group as given
+ * {@param namespaceName}
+ *
+ * @param pulsar
+ * @param namespaceName
+ * @param brokerToNamespaceToBundleRange
+ * @return
+ */
+ public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
+ final PulsarService pulsar, String namespaceName,
+ Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange) {
+
+ CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
+ ZooKeeperDataCache<Policies> policiesCache = pulsar.getConfigurationCache().policiesCache();
+
+ policiesCache.getAsync(path(POLICIES, namespaceName)).thenAccept(policies -> {
+ if (!policies.isPresent() || StringUtils.isBlank(policies.get().antiAffinityGroup)) {
+ antiAffinityNsBrokersResult.complete(null);
+ return;
+ }
+ final String antiAffinityGroup = policies.get().antiAffinityGroup;
+ final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ brokerToNamespaceToBundleRange.forEach((broker, nsToBundleRange) -> {
+ nsToBundleRange.forEach((ns, bundleRange) -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ futures.add(future);
+ policiesCache.getAsync(path(POLICIES, ns)).thenAccept(nsPolicies -> {
+ if (nsPolicies.isPresent() && antiAffinityGroup.equalsIgnoreCase(nsPolicies.get().antiAffinityGroup)) {
+ brokerToAntiAffinityNamespaceCount.compute(broker,
+ (brokerName, count) -> count == null ? 1 : count + 1);
+ }
+ future.complete(null);
+ }).exceptionally(ex -> {
+ future.complete(null);
+ return null;
+ });
+ });
+ });
+ FutureUtil.waitForAll(futures)
+ .thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));
+ }).exceptionally(ex -> {
+ // namespace-policies has not been created yet
+ antiAffinityNsBrokersResult.complete(null);
+ return null;
+ });
+ return antiAffinityNsBrokersResult;
+ }
+
+ /**
+ *
+ * It checks if given anti-affinity namespace should be unloaded by broker due to load-shedding. If all the brokers
+ * are owning same number of anti-affinity namespaces then unloading this namespace again ends up at the same broker
+ * from which it was unloaded. So, this util checks that given namespace should be unloaded only if it can be loaded
+ * by different broker.
+ *
+ * @param namespace
+ * @param bundle
+ * @param currentBroker
+ * @param pulsar
+ * @param brokerToNamespaceToBundleRange
+ * @param candidateBroekrs
+ * @return
+ * @throws Exception
+ */
+ public static boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker,
+ final PulsarService pulsar, Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ Set<String> candidateBroekrs) throws Exception {
+
+ Map<String, Integer> brokerNamespaceCount = getAntiAffinityNamespaceOwnedBrokers(pulsar, namespace,
+ brokerToNamespaceToBundleRange).get(10, TimeUnit.SECONDS);
+ if (brokerNamespaceCount != null && !brokerNamespaceCount.isEmpty()) {
+ int leastNsCount = Integer.MAX_VALUE;
+ int currentBrokerNsCount = 0;
+
+ for (String broker : candidateBroekrs) {
+ int nsCount = brokerNamespaceCount.getOrDefault(broker, 0);
+ if (currentBroker.equals(broker)) {
+ currentBrokerNsCount = nsCount;
+ }
+ if (leastNsCount > nsCount) {
+ leastNsCount = nsCount;
+ }
+ }
+ // check if there is any other broker has less number of ns
+ if (leastNsCount == 0 || currentBrokerNsCount > leastNsCount) {
+ return true;
+ }
+ // check if all the brokers having same number of ns-count then broker can't unload
+ int leastNsOwnerBrokers = 0;
+ for (String broker : candidateBroekrs) {
+ if (leastNsCount == brokerNamespaceCount.getOrDefault(broker, 0)) {
+ leastNsOwnerBrokers++;
+ }
+ }
+ // if all candidate brokers own same-number of ns then broker can't unload
+ return candidateBroekrs.size() != leastNsOwnerBrokers;
+ }
+ return true;
+ }
+
public interface BrokerTopicLoadingPredicate {
boolean isEnablePersistentTopics(String brokerUrl);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 9d5cf30..b8519d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.broker.loadbalance.impl;
import static org.apache.pulsar.broker.admin.AdminResource.jsonMapper;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -57,6 +60,8 @@ import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLo
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -75,6 +80,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
import io.netty.util.concurrent.DefaultThreadFactory;
public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
@@ -170,7 +177,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
-
+
+ private Map<String, String> brokerToFailureDomainMap;
private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);
@@ -188,6 +196,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
loadSheddingPipeline.add(new OverloadShedder(conf));
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
+ this.brokerToFailureDomainMap = Maps.newHashMap();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
@@ -214,6 +223,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
* The service to initialize with.
*/
public void initialize(final PulsarService pulsar) {
+ this.pulsar = pulsar;
availableActiveBrokers = new ZooKeeperChildrenCache(pulsar.getLocalZkCache(),
LoadManager.LOADBALANCE_BROKERS_ROOT);
availableActiveBrokers.registerListener(new ZooKeeperCacheListener<Set<String>>() {
@@ -266,9 +276,15 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
- this.pulsar = pulsar;
zkClient = pulsar.getZkClient();
filterPipeline.add(new BrokerVersionFilter());
+
+ refreshBrokerToFailureDomainMap();
+ // register listeners for domain changes
+ pulsar.getConfigurationCache().failureDomainListCache()
+ .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
+ pulsar.getConfigurationCache().failureDomainCache()
+ .registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
}
/**
@@ -421,7 +437,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
// Update both the broker data and the bundle data.
- private void updateAll() {
+ public void updateAll() {
if (log.isDebugEnabled()) {
log.debug("Updating broker and bundle data for loadreport");
}
@@ -575,10 +591,14 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
final String broker = entry.getKey();
final String bundle = entry.getValue();
- log.info("Unloading bundle: {}", bundle);
- pulsar.getAdminClient().namespaces().unloadNamespaceBundle(
- LoadManagerShared.getNamespaceNameFromBundleName(bundle),
- LoadManagerShared.getBundleRangeFromBundleName(bundle));
+
+ final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
+ continue;
+ }
+ log.info("Unloading bundle: {} from broker {}", bundle, broker);
+ pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, bundleRange);
loadData.getRecentlyUnloadedBundles().put(bundle, System.currentTimeMillis());
}
} catch (Exception e) {
@@ -589,6 +609,32 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
}
+ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
+ try {
+ Optional<Policies> nsPolicies = pulsar.getConfigurationCache().policiesCache()
+ .get(path(POLICIES, namespace));
+ if (!nsPolicies.isPresent() || StringUtils.isBlank(nsPolicies.get().antiAffinityGroup)) {
+ return true;
+ }
+
+ synchronized (brokerCandidateCache) {
+ brokerCandidateCache.clear();
+ ServiceUnitId serviceUnit = pulsar.getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(namespace, bundle);
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache,
+ getAvailableBrokers(), brokerTopicLoadingPredicate);
+ return LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace, bundle, currentBroker, pulsar,
+ brokerToNamespaceToBundleRange, brokerCandidateCache);
+ }
+
+ } catch (Exception e) {
+ log.warn("Failed to check anti-affinity namespace ownership for {}/{}/{}, {}", namespace, bundle,
+ currentBroker, e.getMessage());
+
+ }
+ return true;
+ }
+
/**
* As the leader broker, attempt to automatically detect and split hot namespace bundles.
*/
@@ -657,11 +703,18 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
- LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
+
// filter brokers which owns topic higher than threshold
LoadManagerShared.filterBrokersWithLargeTopicCount(brokerCandidateCache, loadData,
conf.getLoadBalancerBrokerMaxTopics());
+
+ // distribute namespaces to domain and brokers according to anti-affinity-group
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache,
+ brokerToNamespaceToBundleRange, brokerToFailureDomainMap);
+ // distribute bundles evenly to candidate-brokers
+
LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache,
brokerToNamespaceToBundleRange);
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);
@@ -673,13 +726,13 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
} catch ( BrokerFilterException x ) {
// restore the list of brokers to the full set
- LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
if ( brokerCandidateCache.isEmpty() ) {
// restore the list of brokers to the full set
- LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
}
@@ -693,7 +746,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
- LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
brokerTopicLoadingPredicate);
broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
}
@@ -871,4 +924,33 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
}
return 0;
}
+
+ private void refreshBrokerToFailureDomainMap() {
+ if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
+ return;
+ }
+ final String clusterDomainRootPath = pulsar.getConfigurationCache().CLUSTER_FAILURE_DOMAIN_ROOT;
+ try {
+ synchronized (brokerToFailureDomainMap) {
+ Map<String, String> tempBrokerToFailureDomainMap = Maps.newHashMap();
+ for (String domainName : pulsar.getConfigurationCache().failureDomainListCache().get()) {
+ try {
+ Optional<FailureDomain> domain = pulsar.getConfigurationCache().failureDomainCache()
+ .get(clusterDomainRootPath + "/" + domainName);
+ if (domain.isPresent()) {
+ for (String broker : domain.get().brokers) {
+ tempBrokerToFailureDomainMap.put(broker, domainName);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Failed to get domain {}", domainName, e);
+ }
+ }
+ this.brokerToFailureDomainMap = tempBrokerToFailureDomainMap;
+ }
+ log.info("Cluster domain refreshed {}", brokerToFailureDomainMap);
+ } catch (Exception e) {
+ log.warn("Failed to get domain-list for cluster {}", e.getMessage());
+ }
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 338f302..bdf0345 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -913,7 +913,7 @@ public class SimpleLoadManagerImpl implements LoadManager, ZooKeeperCacheListene
}
brokerCandidateCache.clear();
try {
- LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, availableBrokersCache,
brokerTopicLoadingPredicate);
} catch (Exception e) {
log.warn("Error when trying to apply policies: {}", e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index c22758d..40b7d03 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -168,25 +168,26 @@ public abstract class PulsarWebResource {
} catch (RestException e) {
throw e;
} catch (Exception e) {
- log.error("Failed to get property admin data for property");
+ log.error("Failed to get property admin data for property {}", property);
throw new RestException(e);
}
}
protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
- PropertyAdmin propertyAdmin;
-
- try {
- propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
- } catch (KeeperException.NoNodeException e) {
- log.warn("Failed to get property admin data for non existing property {}", property);
- throw new RestException(Status.NOT_FOUND, "Property does not exist");
- }
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
(isClientAuthenticated(clientAppId)), clientAppId);
+
+ PropertyAdmin propertyAdmin;
+
+ try {
+ propertyAdmin = pulsar.getConfigurationCache().propertiesCache().get(path(POLICIES, property))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Property does not exist"));
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("Failed to get property admin data for non existing property {}", property);
+ throw new RestException(Status.NOT_FOUND, "Property does not exist");
+ }
if (!isClientAuthenticated(clientAppId)) {
throw new RestException(Status.FORBIDDEN, "Need to authenticate to perform the request");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index e0d55a6..73071ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -115,7 +115,7 @@ public class SLAMonitoringTest {
throws PulsarClientException, MalformedURLException, PulsarAdminException {
ClusterData clusterData = new ClusterData();
clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString());
- pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
+ pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData);
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add("my-cluster");
PropertyAdmin adminConfig = new PropertyAdmin();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 795d807..444d224 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -172,21 +172,23 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
public void clusters() throws Exception {
admin.clusters().createCluster("usw",
new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT));
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+ // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+ // failure-domain znode of this default cluster
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getCluster("use"),
new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
admin.clusters().updateCluster("usw",
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
admin.clusters().updateCluster("usw",
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
"https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
"https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
@@ -194,11 +196,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.clusters().deleteCluster("usw");
Thread.sleep(300);
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use"));
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use"));
admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
// Check name validation
try {
@@ -382,7 +384,9 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
+ // "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
+ // failure-domain znode of this default cluster
+ assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
}
/**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 2362f44..5825783 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -25,7 +26,9 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.URL;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -54,6 +57,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -582,7 +586,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
assertTrue(e instanceof PreconditionFailedException);
}
}
-
+
/**
* It validates that peer-cluster can't coexist in replication-cluster list
*
@@ -640,4 +644,69 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
}
+ @Test
+ public void clusterFailureDomain() throws PulsarAdminException {
+
+ final String cluster = pulsar.getConfiguration().getClusterName();
+ admin.clusters().updateCluster(cluster,
+ new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
+ // create
+ FailureDomain domain = new FailureDomain();
+ domain.setBrokers(Sets.newHashSet("b1", "b2", "b3"));
+ admin.clusters().createFailureDomain(cluster, "domain-1", domain);
+ admin.clusters().updateFailureDomain(cluster, "domain-1", domain);
+
+ assertEquals(admin.clusters().getFailureDomain(cluster, "domain-1"), domain);
+
+ Map<String, FailureDomain> domains = admin.clusters().getFailureDomains(cluster);
+ assertEquals(domains.size(), 1);
+ assertTrue(domains.containsKey("domain-1"));
+
+ try {
+ // try to create domain with already registered brokers
+ admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+ fail("should have failed because of brokers are already registered");
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok
+ }
+
+ admin.clusters().deleteFailureDomain(cluster, "domain-1");
+ assertTrue(admin.clusters().getFailureDomains(cluster).isEmpty());
+
+ admin.clusters().createFailureDomain(cluster, "domain-2", domain);
+ domains = admin.clusters().getFailureDomains(cluster);
+ assertEquals(domains.size(), 1);
+ assertTrue(domains.containsKey("domain-2"));
+ }
+
+ @Test
+ public void namespaceAntiAffinity() throws PulsarAdminException {
+ final String namespace = "prop-xyz/use/ns1";
+ final String antiAffinityGroup = "group";
+ assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+ admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+ assertEquals(admin.namespaces().getNamespaceAntiAffinityGroup(namespace), antiAffinityGroup);
+ admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+ assertTrue(isBlank(admin.namespaces().getNamespaceAntiAffinityGroup(namespace)));
+
+ final String ns1 = "prop-xyz/use/antiAG1";
+ final String ns2 = "prop-xyz/use/antiAG2";
+ final String ns3 = "prop-xyz/use/antiAG3";
+ admin.namespaces().createNamespace(ns1);
+ admin.namespaces().createNamespace(ns2);
+ admin.namespaces().createNamespace(ns3);
+ admin.namespaces().setNamespaceAntiAffinityGroup(ns1, antiAffinityGroup);
+ admin.namespaces().setNamespaceAntiAffinityGroup(ns2, antiAffinityGroup);
+ admin.namespaces().setNamespaceAntiAffinityGroup(ns3, antiAffinityGroup);
+
+ Set<String> namespaces = new HashSet<>(
+ admin.namespaces().getAntiAffinityNamespaces("dummy", "use", antiAffinityGroup));
+ assertEquals(namespaces.size(), 3);
+ assertTrue(namespaces.contains(ns1));
+ assertTrue(namespaces.contains(ns2));
+ assertTrue(namespaces.contains(ns3));
+
+ List<String> namespaces2 = admin.namespaces().getAntiAffinityNamespaces("dummy", "use", "invalid-group");
+ assertEquals(namespaces2.size(), 0);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0245b67..c587033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -91,10 +91,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
private Field uriField;
private UriInfo uriInfo;
+ private final String configClusterName = "use";
public AdminTest() {
super();
- conf.setClusterName("use");
+ conf.setClusterName(configClusterName);
}
@Override
@@ -186,10 +187,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
@Test
void clusters() throws Exception {
- assertEquals(clusters.getClusters(), new ArrayList<String>());
+ assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName));
verify(clusters, never()).validateSuperUserAccess();
- clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
+ clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
verify(clusters, times(1)).validateSuperUserAccess();
// ensure to read from ZooKeeper directly
clusters.clustersListCache().clear();
@@ -447,7 +448,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
assertEquals(properties.getProperties(), Lists.newArrayList());
// Create a namespace to test deleting a non-empty property
- clusters.createCluster("use", new ClusterData());
+ clusters.updateCluster("use", new ClusterData());
newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use"));
properties.createProperty("my-property", newPropertyAdmin);
@@ -474,7 +475,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
@Test
void brokers() throws Exception {
- clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com",
+ clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com",
"https://broker.messaging.use.example.com:4443"));
URI requestUri = new URI(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 37ec7eb..36c9ec2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -135,7 +135,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doNothing().when(namespaces).validateAdminAccessOnProperty("other-property");
doNothing().when(namespaces).validateAdminAccessOnProperty("new-property");
- admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
+ admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT));
admin.properties().createProperty(this.testProperty,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
index 0f79e75..cee4bbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java
@@ -64,7 +64,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest {
assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
- admin.clusters().createCluster("c1", new ClusterData());
+ admin.clusters().updateCluster("c1", new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 67873bc..57ed5e5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -77,6 +77,7 @@ public abstract class MockedPulsarServiceBaseTest {
protected MockZooKeeper mockZookKeeper;
protected NonClosableMockBookKeeper mockBookKeeper;
protected boolean isTcpLookup = false;
+ protected final String configClusterName = "test";
private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
@@ -90,13 +91,12 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setBrokerServicePortTls(BROKER_PORT_TLS);
this.conf.setWebServicePort(BROKER_WEBSERVICE_PORT);
this.conf.setWebServicePortTls(BROKER_WEBSERVICE_PORT_TLS);
- this.conf.setClusterName("test");
+ this.conf.setClusterName(configClusterName);
this.conf.setAdvertisedAddress("localhost"); // there are TLS tests in here, they need to use localhost because of the certificate
this.conf.setManagedLedgerCacheSizeMB(8);
this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
- this.conf.setClusterName("mock");
}
protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
new file mode 100644
index 0000000..ec0bf1f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.beust.jcommander.internal.Maps;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+import junit.framework.Assert;
+
+public class AntiAffinityNamespaceGroupTest {
+ private LocalBookkeeperEnsemble bkEnsemble;
+
+ private URL url1;
+ private PulsarService pulsar1;
+ private PulsarAdmin admin1;
+
+ private URL url2;
+ private PulsarService pulsar2;
+ private PulsarAdmin admin2;
+
+ private String primaryHost;
+ private String secondaryHost;
+
+ private NamespaceBundleFactory nsFactory;
+
+ private ModularLoadManagerImpl primaryLoadManager;
+ private ModularLoadManagerImpl secondaryLoadManager;
+
+ private ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>());
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+ private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort();
+ private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort();
+ private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort();
+ private static final Logger log = LoggerFactory.getLogger(AntiAffinityNamespaceGroupTest.class);
+
+ static {
+ System.setProperty("test.basePort", "16100");
+ }
+
+ private static Object getField(final Object instance, final String fieldName) throws Exception {
+ final Field field = instance.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(instance);
+ }
+
+ @BeforeMethod
+ void setup() throws Exception {
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort());
+ bkEnsemble.start();
+
+ // Start broker 1
+ ServiceConfiguration config1 = new ServiceConfiguration();
+ config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ config1.setClusterName("use");
+ config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT);
+ config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config1.setBrokerServicePort(PRIMARY_BROKER_PORT);
+ config1.setFailureDomainsEnabled(true);
+ config1.setLoadBalancerEnabled(true);
+ createCluster(bkEnsemble.getZkClient(), config1);
+ pulsar1 = new PulsarService(config1);
+
+ pulsar1.start();
+
+ primaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(), PRIMARY_BROKER_WEBSERVICE_PORT);
+ url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
+ admin1 = new PulsarAdmin(url1, (Authentication) null);
+
+ // Start broker 2
+ ServiceConfiguration config2 = new ServiceConfiguration();
+ config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ config2.setClusterName("use");
+ config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT);
+ config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config2.setBrokerServicePort(SECONDARY_BROKER_PORT);
+ config2.setFailureDomainsEnabled(true);
+ pulsar2 = new PulsarService(config2);
+ secondaryHost = String.format("%s:%d", InetAddress.getLocalHost().getHostName(),
+ SECONDARY_BROKER_WEBSERVICE_PORT);
+
+ pulsar2.start();
+
+ url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT);
+ admin2 = new PulsarAdmin(url2, (Authentication) null);
+
+ primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager");
+ secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager");
+ nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32());
+ Thread.sleep(100);
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ executor.shutdown();
+
+ admin1.close();
+ admin2.close();
+
+ pulsar2.close();
+ pulsar1.close();
+
+ bkEnsemble.stop();
+ }
+
+ private void createCluster(ZooKeeper zk, ServiceConfiguration config) throws Exception {
+ ZkUtils.createFullPathOptimistic(zk, "/admin/clusters/" + config.getClusterName(),
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+ new ClusterData("http://" + config.getAdvertisedAddress() + ":" + config.getWebServicePort())),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ @Test
+ public void testClusterDomain() {
+
+ }
+
+ /**
+ *
+ * It verifies anti-affinity-namespace assignment with failure-domain
+ *
+ * <pre>
+ * Domain Brokers-count
+ * ________ ____________
+ * domain-0 broker-0,broker-1
+ * domain-1 broker-2,broker-3
+ *
+ * Anti-affinity-namespace assignment
+ *
+ * (1) ns0 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+ * (2) ns1 -> candidate-brokers: b2, b3 => selected b2
+ * (3) ns2 -> candidate-brokers: b1, b3 => selected b1
+ * (4) ns3 -> candidate-brokers: b3 => selected b3
+ * (5) ns4 -> candidate-brokers: b0, b1, b2, b3 => selected b0
+ *
+ * "candidate" broker to own anti-affinity-namespace = b2 or b4
+ *
+ * </pre>
+ *
+ */
+ @Test
+ public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final int totalNamespaces = 5;
+ final String namespaceAntiAffinityGroup = "my-antiaffinity";
+ final String bundle = "/0x00000000_0xffffffff";
+ final int totalBrokers = 4;
+
+ pulsar1.getConfiguration().setFailureDomainsEnabled(true);
+ admin1.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+ for (int i = 0; i < totalNamespaces; i++) {
+ final String ns = namespace + i;
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+ }
+
+ Set<String> brokers = Sets.newHashSet();
+ Map<String, String> brokerToDomainMap = Maps.newHashMap();
+ brokers.add("brokerName-0");
+ brokerToDomainMap.put("brokerName-0", "domain-0");
+ brokers.add("brokerName-1");
+ brokerToDomainMap.put("brokerName-1", "domain-0");
+ brokers.add("brokerName-2");
+ brokerToDomainMap.put("brokerName-2", "domain-1");
+ brokers.add("brokerName-3");
+ brokerToDomainMap.put("brokerName-3", "domain-1");
+
+ Set<String> candidate = Sets.newHashSet();
+ Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+
+ Assert.assertEquals(brokers.size(), totalBrokers);
+
+ String assignedNamespace = namespace + "0" + bundle;
+ candidate.addAll(brokers);
+
+ // for namespace-0 all brokers available
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+ brokerToNamespaceToBundleRange, brokerToDomainMap);
+ Assert.assertEquals(brokers.size(), totalBrokers);
+
+ // add namespace-0 to broker-0 of domain-0 => state: n0->b0
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-0", namespace + "0", assignedNamespace);
+ candidate.addAll(brokers);
+ // for namespace-1 only domain-1 brokers are available as broker-0 already owns namespace-0
+ assignedNamespace = namespace + "1" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, brokerToDomainMap);
+ Assert.assertEquals(candidate.size(), 2);
+ candidate.forEach(broker -> Assert.assertEquals(brokerToDomainMap.get(broker), "domain-1"));
+
+ // add namespace-1 to broker-2 of domain-1 => state: n0->b0, n1->b2
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-2", namespace + "1", assignedNamespace);
+ candidate.addAll(brokers);
+ // for namespace-2 only brokers available are : broker-1 and broker-3
+ assignedNamespace = namespace + "2" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, brokerToDomainMap);
+ Assert.assertEquals(candidate.size(), 2);
+ Assert.assertTrue(candidate.contains("brokerName-1"));
+ Assert.assertTrue(candidate.contains("brokerName-3"));
+
+ // add namespace-2 to broker-1 of domain-0 => state: n0->b0, n1->b2, n2->b1
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-1", namespace + "2", assignedNamespace);
+ candidate.addAll(brokers);
+ // for namespace-3 only brokers available are : broker-3
+ assignedNamespace = namespace + "3" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, brokerToDomainMap);
+ Assert.assertEquals(candidate.size(), 1);
+ Assert.assertTrue(candidate.contains("brokerName-3"));
+ // add namespace-3 to broker-3 of domain-1 => state: n0->b0, n1->b2, n2->b1, n3->b3
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "brokerName-3", namespace + "3", assignedNamespace);
+ candidate.addAll(brokers);
+ // for namespace-4 only brokers available are : all 4 brokers
+ assignedNamespace = namespace + "4" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, brokerToDomainMap);
+ Assert.assertEquals(candidate.size(), 4);
+ }
+
+ /**
+ * It verifies anti-affinity-namespace assignment without failure-domain enabled
+ *
+ * <pre>
+ * Anti-affinity-namespace assignment
+ *
+ * (1) ns0 -> candidate-brokers: b0, b1, b2 => selected b0
+ * (2) ns1 -> candidate-brokers: b1, b2 => selected b1
+ * (3) ns2 -> candidate-brokers: b2 => selected b2
+ * (5) ns3 -> candidate-brokers: b0, b1, b2 => selected b0
+ * </pre>
+ *
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final int totalNamespaces = 5;
+ final String namespaceAntiAffinityGroup = "my-antiaffinity";
+ final String bundle = "/0x00000000_0xffffffff";
+
+ admin1.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+ for (int i = 0; i < totalNamespaces; i++) {
+ final String ns = namespace + i;
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+ }
+
+ Set<String> brokers = Sets.newHashSet();
+ Set<String> candidate = Sets.newHashSet();
+ Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+ brokers.add("broker-0");
+ brokers.add("broker-1");
+ brokers.add("broker-2");
+
+ String assignedNamespace = namespace + "0" + bundle;
+
+ // all brokers available so, candidate will be all 3 brokers
+ candidate.addAll(brokers);
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, brokers,
+ brokerToNamespaceToBundleRange, null);
+ Assert.assertEquals(brokers.size(), 3);
+
+ // add ns-0 to broker-0
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+ candidate.addAll(brokers);
+ assignedNamespace = namespace + "1" + bundle;
+ // available brokers for ns-1 => broker-1, broker-2
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, null);
+ Assert.assertEquals(candidate.size(), 2);
+ Assert.assertTrue(candidate.contains("broker-1"));
+ Assert.assertTrue(candidate.contains("broker-2"));
+
+ // add ns-1 to broker-1
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+ candidate.addAll(brokers);
+ // available brokers for ns-2 => broker-2
+ assignedNamespace = namespace + "2" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, null);
+ Assert.assertEquals(candidate.size(), 1);
+ Assert.assertTrue(candidate.contains("broker-2"));
+
+ // add ns-2 to broker-2
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+ candidate.addAll(brokers);
+ // available brokers for ns-3 => broker-0, broker-1, broker-2
+ assignedNamespace = namespace + "3" + bundle;
+ LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar1, assignedNamespace, candidate,
+ brokerToNamespaceToBundleRange, null);
+ Assert.assertEquals(candidate.size(), 3);
+ }
+
+ private void selectBrokerForNamespace(Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange,
+ String broker, String namespace, String assignedBundleName) {
+ Map<String, Set<String>> nsToBundleMap = Maps.newHashMap();
+ nsToBundleMap.put(namespace, Sets.newHashSet(assignedBundleName));
+ brokerToNamespaceToBundleRange.put(broker, nsToBundleMap);
+ }
+
+ /**
+ * It verifies anti-affinity with failure domain enabled with 2 brokers.
+ *
+ * <pre>
+ * 1. Register brokers to domain: domain-1: broker1 & domain-2: broker2
+ * 2. Load-Manager receives a watch and updates brokerToDomain cache with new domain data
+ * 3. Create two namespace with anti-affinity
+ * 4. Load-manager selects broker for each namespace such that from different domains
+ *
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
+
+ final String broker1 = primaryHost;
+ final String broker2 = secondaryHost;
+ final String cluster = pulsar1.getConfiguration().getClusterName();
+ final String property = "prop";
+ final String namespace1 = property + "/" + cluster + "/ns1";
+ final String namespace2 = property + "/" + cluster + "/ns2";
+ final String namespaceAntiAffinityGroup = "group";
+ FailureDomain domain = new FailureDomain();
+ domain.brokers = Sets.newHashSet(broker1);
+ admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+ domain.brokers = Sets.newHashSet(broker2);
+ admin1.clusters().createFailureDomain(cluster, "domain1", domain);
+ admin1.properties().createProperty(property, new PropertyAdmin(null, Sets.newHashSet(cluster)));
+ admin1.namespaces().createNamespace(namespace1);
+ admin1.namespaces().createNamespace(namespace2);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(namespace1, namespaceAntiAffinityGroup);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(namespace2, namespaceAntiAffinityGroup);
+
+ // validate strategically if brokerToDomainCache updated
+ for (int i = 0; i < 5; i++) {
+ if (!isLoadManagerUpdatedDomainCache(primaryLoadManager)
+ || !isLoadManagerUpdatedDomainCache(secondaryLoadManager) || i != 4) {
+ Thread.sleep(200);
+ }
+ }
+ assertTrue(isLoadManagerUpdatedDomainCache(primaryLoadManager));
+ assertTrue(isLoadManagerUpdatedDomainCache(secondaryLoadManager));
+
+ ServiceUnitId serviceUnit = makeBundle(property, cluster, "ns1");
+ String selectedBroker1 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+ serviceUnit = makeBundle(property, cluster, "ns2");
+ String selectedBroker2 = primaryLoadManager.selectBrokerForAssignment(serviceUnit);
+
+ assertNotEquals(selectedBroker1, selectedBroker2);
+
+ }
+
+ /**
+ * It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
+ * cause uneven anti-affinitiy namespace distribution.
+ *
+ * <pre>
+ * 1. broker1 owns ns-0 => broker1 can unload ns-0
+ * 1. broker2 owns ns-1 => broker1 can unload ns-0
+ * 1. broker3 owns ns-2 => broker1 can't unload ns-0 as all brokers have same no NS
+ * </pre>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final int totalNamespaces = 5;
+ final String namespaceAntiAffinityGroup = "my-antiaffinity";
+ final String bundle = "/0x00000000_0xffffffff";
+
+ admin1.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+ for (int i = 0; i < totalNamespaces; i++) {
+ final String ns = namespace + i;
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+ }
+
+ Set<String> brokers = Sets.newHashSet();
+ Set<String> candidate = Sets.newHashSet();
+ Map<String, Map<String, Set<String>>> brokerToNamespaceToBundleRange = Maps.newHashMap();
+ brokers.add("broker-0");
+ brokers.add("broker-1");
+ brokers.add("broker-2");
+
+ String assignedNamespace = namespace + "0" + bundle;
+
+ // all brokers available so, candidate will be all 3 brokers
+ candidate.addAll(brokers);
+ // add ns-0 to broker-0
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-0", namespace + "0", assignedNamespace);
+ String currentBroker = "broker-0";
+ boolean shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle,
+ currentBroker, pulsar1, brokerToNamespaceToBundleRange, candidate);
+ assertTrue(shouldUnload);
+ // add ns-1 to broker-1
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-1", namespace + "1", assignedNamespace);
+ shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+ pulsar1, brokerToNamespaceToBundleRange, candidate);
+ assertTrue(shouldUnload);
+ // add ns-2 to broker-2
+ selectBrokerForNamespace(brokerToNamespaceToBundleRange, "broker-2", namespace + "2", assignedNamespace);
+ shouldUnload = LoadManagerShared.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, currentBroker,
+ pulsar1, brokerToNamespaceToBundleRange, candidate);
+ assertFalse(shouldUnload);
+
+ }
+
+ /**
+ * It verifies that load-manager::shouldAntiAffinityNamespaceUnload checks that unloading should only happen if all
+ * brokers have same number of anti-affinity namespaces
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
+
+ final String namespace = "my-property/use/my-ns";
+ final int totalNamespaces = 5;
+ final String namespaceAntiAffinityGroup = "my-antiaffinity";
+ final String bundle = "0x00000000_0xffffffff";
+
+ admin1.properties().createProperty("my-property",
+ new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
+
+ for (int i = 0; i < totalNamespaces; i++) {
+ final String ns = namespace + i;
+ admin1.namespaces().createNamespace(ns);
+ admin1.namespaces().setNamespaceAntiAffinityGroup(ns, namespaceAntiAffinityGroup);
+ }
+
+ PulsarClient pulsarClient = PulsarClient.create(pulsar1.getWebServiceAddress());
+ Producer producer = pulsarClient.createProducer("persistent://" + namespace + "0/my-topic1");
+ ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
+ .getLoadManager().get()).getLoadManager();
+
+ pulsar1.getBrokerService().updateRates();
+ loadManager.updateAll();
+
+ assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace + "0", bundle, primaryHost));
+ producer.close();
+ pulsarClient.close();
+ }
+
+ private boolean isLoadManagerUpdatedDomainCache(ModularLoadManagerImpl loadManager) throws Exception {
+ Field mapField = ModularLoadManagerImpl.class.getDeclaredField("brokerToFailureDomainMap");
+ mapField.setAccessible(true);
+ Map<String, String> map = (Map<String, String>) mapField.get(loadManager);
+ return !map.isEmpty();
+ }
+
+ private NamespaceBundle makeBundle(final String property, final String cluster, final String namespace) {
+ return nsFactory.getBundle(NamespaceName.get(property, cluster, namespace),
+ Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND,
+ BoundType.CLOSED));
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index cc4c082..fce825c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -499,7 +499,7 @@ public class ModularLoadManagerImplTest {
final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
final String sharedBroker = "broker3";
- admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
+ admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
admin1.properties().createProperty(property,
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);
@@ -535,7 +535,7 @@ public class ModularLoadManagerImplTest {
// test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only
Set<String> brokerCandidateCache = Sets.newHashSet();
Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -543,7 +543,7 @@ public class ModularLoadManagerImplTest {
// test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -551,7 +551,7 @@ public class ModularLoadManagerImplTest {
// test3: shared=1, primary=0, secondary=0 => It should return 0 broker
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);
@@ -565,7 +565,7 @@ public class ModularLoadManagerImplTest {
// test1: shared=1, primary=1, secondary=1 => It should return primary + secondary
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 2);
assertTrue(brokerCandidateCache.contains(broker1Address));
@@ -574,7 +574,7 @@ public class ModularLoadManagerImplTest {
// test2: shared=1, secondary=1 => It should return secondary
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));
@@ -582,7 +582,7 @@ public class ModularLoadManagerImplTest {
// test3: shared=1, => It should return 0 broker
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker);
- LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 720b48e..286a700 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -95,7 +95,7 @@ public class BacklogQuotaManagerTest {
adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
admin = new PulsarAdmin(adminUrl, (Authentication) null);
- admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+ admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
admin.properties().createProperty("prop",
new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
admin.namespaces().createNamespace("prop/usc/ns-quota");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 8e8f31d..38afc78 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -102,7 +102,7 @@ public class BrokerBkEnsemblesTests {
adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
admin = new PulsarAdmin(adminUrl, (Authentication) null);
- admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
+ admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
admin.properties().createProperty("prop",
new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
} catch (Throwable t) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 44372e7..e487f12 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -203,11 +203,11 @@ public class ReplicatorTestBase {
admin3 = new PulsarAdmin(url3, (Authentication) null);
// Provision the global namespace
- admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
+ admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
+ admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
+ admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
index d9a7c0f..0ac89e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.zookeeper;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -29,7 +29,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest {
@@ -52,11 +52,11 @@ public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseT
public void testSessionExpired() throws Exception {
admin.clusters().createCluster("my-cluster", new ClusterData("test-url"));
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+ assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
mockZookKeeper.failNow(Code.SESSIONEXPIRED);
- assertEquals(admin.clusters().getClusters(), Lists.newArrayList("my-cluster"));
+ assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
try {
admin.clusters().createCluster("my-cluster-2", new ClusterData("test-url"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 7c7cff4..83b15f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -167,7 +167,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
authTls.configure(authParams);
internalSetup(authTls);
- admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -189,7 +189,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
authTls.configure(authParams);
internalSetup(authTls);
- admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));
@@ -243,7 +243,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
// this will cause NPE and it should throw 500
doReturn(null).when(pulsar).getGlobalZkCache();
try {
- admin.clusters().createCluster(cluster, clusterData);
+ admin.clusters().updateCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
}
@@ -268,7 +268,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
authTls.configure(authParams);
internalSetup(authTls);
- admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
@@ -292,4 +292,4 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 4a8d8cf..0d1b8c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -73,6 +73,7 @@ import com.google.common.collect.Sets;
public class NonPersistentTopicTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicTest.class);
+ private final String configClusterName = "r1";
@DataProvider(name = "subscriptionType")
public Object[][] getSubscriptionType() {
@@ -835,7 +836,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
// completely
// independent config objects instead of referring to the same properties object
ServiceConfiguration config1 = new ServiceConfiguration();
- config1.setClusterName("r1");
+ config1.setClusterName(configClusterName);
config1.setWebServicePort(webServicePort1);
config1.setZookeeperServers("127.0.0.1:" + zkPort1);
config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
@@ -901,11 +902,11 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
admin3 = new PulsarAdmin(url3, (Authentication) null);
// Provision the global namespace
- admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
+ admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
+ admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
- admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
+ admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
index b8c1cde..7b8e083 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerBase.java
@@ -40,6 +40,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+ private final String clusterName = "use";
@BeforeMethod
@Override
@@ -62,7 +63,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
- conf.setClusterName("use");
+ conf.setClusterName(clusterName);
}
protected void internalSetUpForClient() throws Exception {
@@ -78,7 +79,7 @@ public class TlsProducerConsumerBase extends ProducerConsumerBase {
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
- admin.clusters().createCluster("use",
+ admin.clusters().updateCluster(clusterName,
new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT,
"pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
index 3bac504..8398ea2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/discovery/service/web/DiscoveryServiceWebTest.java
@@ -76,7 +76,7 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
* @throws Exception
*/
@Test
- public void testRiderectUrlWithServerStarted() throws Exception {
+ public void testRedirectUrlWithServerStarted() throws Exception {
// 1. start server
int port = PortManager.nextFreePort();
ServiceConfig config = new ServiceConfig();
@@ -100,7 +100,7 @@ public class DiscoveryServiceWebTest extends ProducerConsumerBase {
**/
assertEquals(hitBrokerService(HttpMethod.POST, postRequestUrl, Lists.newArrayList("use")),
- "Property does not exist");
+ "Cannot set replication on a non-global namespace");
assertEquals(hitBrokerService(HttpMethod.PUT, putRequestUrl, new BundlesData(1)), "Property does not exist");
assertEquals(hitBrokerService(HttpMethod.GET, getRequestUrl, null), "Property does not exist");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index c996c9f..302f2e4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -44,7 +44,8 @@ import com.google.common.collect.Sets;
public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
private WebSocketService service;
- private static final int TEST_PORT = PortManager.nextFreePort();;
+ private static final int TEST_PORT = PortManager.nextFreePort();
+ private final String configClusterName = "c1";
public ProxyAuthorizationTest() {
super();
@@ -53,7 +54,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
@BeforeClass
@Override
protected void setup() throws Exception {
- conf.setClusterName("c1");
+ conf.setClusterName(configClusterName);
internalSetup();
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
@@ -81,7 +82,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
assertEquals(auth.canLookup(DestinationName.get("persistent://p1/c1/ns1/ds1"), "my-role"), false);
- admin.clusters().createCluster("c1", new ClusterData());
+ admin.clusters().updateCluster(configClusterName, new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 1984e2a..779e7b1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
/**
@@ -284,4 +285,135 @@ public interface Clusters {
*/
NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException;
+ /**
+ * Create a domain into cluster
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * domain name
+ *
+ * @param FailureDomain
+ * Domain configurations
+ *
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws ConflictException
+ * Broker already exist into other domain
+ *
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ *
+ * @throws PreconditionFailedException
+ * Cluster doesn't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void createFailureDomain(String cluster, String domainName, FailureDomain domain)
+ throws PulsarAdminException;
+
+
+ /**
+ * Update a domain into cluster
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * domain name
+ *
+ * @param FailureDomain
+ * Domain configurations
+ *
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws ConflictException
+ * Broker already exist into other domain
+ *
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ *
+ * @throws PreconditionFailedException
+ * Cluster doesn't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
+ throws PulsarAdminException;
+
+
+ /**
+ * Delete a domain in cluster
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param domainName
+ * Domain name
+ *
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ *
+ * @throws PreconditionFailedException
+ * Cluster doesn't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+
+ void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+
+ /**
+ * Get all registered domains in cluster
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws NotFoundException
+ * Cluster don't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException;
+
+ /**
+ * Get the domain registered into a cluster
+ * <p>
+ *
+ * @param cluster
+ * Cluster name
+ * @return
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ *
+ * @throws NotFoundException
+ * Domain doesn't exist
+ *
+ * @throws PreconditionFailedException
+ * Cluster doesn't exist
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException;
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index ebe754c..3c8d3d5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -420,6 +420,78 @@ public interface Namespaces {
* Unexpected error
*/
void setNamespaceMessageTTL(String namespace, int ttlInSeconds) throws PulsarAdminException;
+
+
+ /**
+ * Set anti-affinity group name for a namespace
+ * <p>
+ * Request example:
+ *
+ * @param namespace
+ * Namespace name
+ * @param namespaceAntiAffinityGroup
+ * anti-affinity group name for a namespace
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup) throws PulsarAdminException;
+
+ /**
+ * Get all namespaces that grouped with given anti-affinity group
+ *
+ * @param property
+ * property is only used for authorization. Client has to be admin of any of the property to access this
+ * api api.
+ * @param cluster
+ * cluster name
+ * @param namespaceAntiAffinityGroup
+ * Anti-affinity group name
+ * @return list of namespace grouped under a given anti-affinity group
+ * @throws PulsarAdminException
+ */
+ List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+ throws PulsarAdminException;
+
+ /**
+ * Get anti-affinity group name for a namespace
+ * <p>
+ * Response example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
+
+ /**
+ * Delete anti-affinity group name for a namespace.
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException;
/**
* Set the deduplication status for all topics within a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 2fcf44a..666a2e8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
@@ -150,4 +151,49 @@ public class ClustersImpl extends BaseResource implements Clusters {
throw getApiException(e);
}
}
+
+ @Override
+ public void createFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+ setDomain(cluster, domainName, domain);
+ }
+
+ @Override
+ public void updateFailureDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException {
+ setDomain(cluster, domainName, domain);
+ }
+
+ @Override
+ public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+ request(clusters.path(cluster).path("failureDomains").path(domainName)).delete(ErrorData.class);
+ }
+
+ @Override
+ public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
+ try {
+ return request(clusters.path(cluster).path("failureDomains"))
+ .get(new GenericType<Map<String, FailureDomain>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
+ try {
+ return request(clusters.path(cluster).path("failureDomains").path(domainName)).get(FailureDomain.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ private void setDomain(String cluster, String domainName,
+ FailureDomain domain) throws PulsarAdminException {
+ try {
+ request(clusters.path(cluster).path("failureDomains").path(domainName)).post(
+ Entity.entity(domain, MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 79aa162..3f0c7e7 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -229,6 +229,55 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void setNamespaceAntiAffinityGroup(String namespace, String namespaceAntiAffinityGroup)
+ throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+ .path("antiAffinity")).post(Entity.entity(namespaceAntiAffinityGroup, MediaType.APPLICATION_JSON),
+ ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public String getNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ return request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+ .path("antiAffinity")).get(new GenericType<String>() {
+ });
+
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public List<String> getAntiAffinityNamespaces(String property, String cluster, String namespaceAntiAffinityGroup)
+ throws PulsarAdminException {
+ try {
+ return request(namespaces.path(cluster).path("antiAffinity").path(namespaceAntiAffinityGroup)
+ .queryParam("property", property)).get(new GenericType<List<String>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void deleteNamespaceAntiAffinityGroup(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ request(namespaces.path(ns.getProperty()).path(ns.getCluster()).path(ns.getLocalName())
+ .path("antiAffinity")).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 84bced0..5ba6c10 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -21,9 +21,11 @@ package org.apache.pulsar.admin.cli;
import java.util.Arrays;
import org.apache.commons.lang3.StringUtils;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
@@ -125,6 +127,84 @@ public class CmdClusters extends CmdBase {
}
}
+ @Parameters(commandDescription = "Create a new failure-domain for a cluster. updates it if already created.")
+ private class CreateFailureDomain extends CliCommand {
+ @Parameter(description = "cluster-name\n", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--domain-name", description = "domain-name", required = true)
+ private String domainName;
+
+ @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+ private String brokerList;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ FailureDomain domain = new FailureDomain();
+ domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")): null));
+ admin.clusters().createFailureDomain(cluster, domainName, domain);
+ }
+ }
+
+ @Parameters(commandDescription = "Update failure-domain for a cluster. Creates a new one if not exist.")
+ private class UpdateFailureDomain extends CliCommand {
+ @Parameter(description = "cluster-name\n", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--domain-name", description = "domain-name", required = true)
+ private String domainName;
+
+ @Parameter(names = "--broker-list", description = "Comma separated broker list", required = false)
+ private String brokerList;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ FailureDomain domain = new FailureDomain();
+ domain.setBrokers((isNotBlank(brokerList) ? Sets.newHashSet(brokerList.split(",")) : null));
+ admin.clusters().updateFailureDomain(cluster, domainName, domain);
+ }
+ }
+
+ @Parameters(commandDescription = "Deletes an existing failure-domain")
+ private class DeleteFailureDomain extends CliCommand {
+ @Parameter(description = "cluster-name\n", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--domain-name", description = "domain-name", required = true)
+ private String domainName;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ admin.clusters().deleteFailureDomain(cluster, domainName);
+ }
+ }
+
+ @Parameters(commandDescription = "List the existing failure-domains for a cluster")
+ private class ListFailureDomains extends CliCommand {
+
+ @Parameter(description = "cluster-name\n", required = true)
+ private java.util.List<String> params;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ print(admin.clusters().getFailureDomains(cluster));
+ }
+ }
+
+ @Parameters(commandDescription = "Get the configuration brokers of a failure-domain")
+ private class GetFailureDomain extends CliCommand {
+ @Parameter(description = "cluster-name\n", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--domain-name", description = "domain-name", required = true)
+ private String domainName;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ print(admin.clusters().getFailureDomain(cluster, domainName));
+ }
+ }
+
public CmdClusters(PulsarAdmin admin) {
super("clusters", admin);
jcommander.addCommand("get", new Get());
@@ -133,6 +213,11 @@ public class CmdClusters extends CmdBase {
jcommander.addCommand("delete", new Delete());
jcommander.addCommand("list", new List());
jcommander.addCommand("update-peer-clusters", new UpdatePeerClusters());
+ jcommander.addCommand("get-failure-domain", new GetFailureDomain());
+ jcommander.addCommand("create-failure-domain", new CreateFailureDomain());
+ jcommander.addCommand("update-failure-domain", new UpdateFailureDomain());
+ jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain());
+ jcommander.addCommand("list-failure-domains", new ListFailureDomains());
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 37d7306..e04eef7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -213,6 +213,65 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Set Anti-affinity group name for a namspace")
+ private class SetAntiAffinityGroup extends CliCommand {
+ @Parameter(description = "property/cluster/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+ private String antiAffinityGroup;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().setNamespaceAntiAffinityGroup(namespace, antiAffinityGroup);
+ }
+ }
+
+ @Parameters(commandDescription = "Get Anti-affinity group name for a namspace")
+ private class GetAntiAffinityGroup extends CliCommand {
+ @Parameter(description = "property/cluster/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(admin.namespaces().getNamespaceAntiAffinityGroup(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Get Anti-affinity namespaces grouped with the given anti-affinity group name")
+ private class GetAntiAffinityNamespaces extends CliCommand {
+
+ @Parameter(names = { "--property",
+ "-p" }, description = "property is only used for authorization. Client has to be admin of any of the property to access this api", required = false)
+ private String property;
+
+ @Parameter(names = { "--cluster", "-c" }, description = "Cluster name", required = true)
+ private String cluster;
+
+ @Parameter(names = { "--group", "-g" }, description = "Anti-affinity group name", required = true)
+ private String antiAffinityGroup;
+
+ @Override
+ void run() throws PulsarAdminException {
+ print(admin.namespaces().getAntiAffinityNamespaces(property, cluster, antiAffinityGroup));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove Anti-affinity group name for a namspace")
+ private class DeleteAntiAffinityGroup extends CliCommand {
+ @Parameter(description = "property/cluster/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
+ }
+ }
+
+
@Parameters(commandDescription = "Enable or disable deduplication for a namespace")
private class SetDeduplication extends CliCommand {
@Parameter(description = "property/cluster/namespace", required = true)
@@ -621,6 +680,11 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-message-ttl", new GetMessageTTL());
jcommander.addCommand("set-message-ttl", new SetMessageTTL());
+
+ jcommander.addCommand("get-anti-affinity-group", new GetAntiAffinityGroup());
+ jcommander.addCommand("set-anti-affinity-group", new SetAntiAffinityGroup());
+ jcommander.addCommand("get-anti-affinity-namespaces", new GetAntiAffinityNamespaces());
+ jcommander.addCommand("delete-anti-affinity-group", new DeleteAntiAffinityGroup());
jcommander.addCommand("set-deduplication", new SetDeduplication());
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 69d57da..ba12db7 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.ResourceQuota;
@@ -131,6 +132,24 @@ public class PulsarAdminToolTest {
clusters.run(split("delete use"));
verify(mockClusters).deleteCluster("use");
+
+ clusters.run(split("list-failure-domains use"));
+ verify(mockClusters).getFailureDomains("use");
+
+ clusters.run(split("get-failure-domain use --domain-name domain"));
+ verify(mockClusters).getFailureDomain("use", "domain");
+
+ clusters.run(split("create-failure-domain use --domain-name domain --broker-list b1"));
+ FailureDomain domain = new FailureDomain();
+ domain.setBrokers(Sets.newHashSet("b1"));
+ verify(mockClusters).createFailureDomain("use", "domain", domain);
+
+ clusters.run(split("update-failure-domain use --domain-name domain --broker-list b1"));
+ verify(mockClusters).updateFailureDomain("use", "domain", domain);
+
+ clusters.run(split("delete-failure-domain use --domain-name domain"));
+ verify(mockClusters).deleteFailureDomain("use", "domain");
+
// Re-create CmdClusters to avoid a issue.
// See https://github.com/cbeust/jcommander/issues/271
@@ -284,6 +303,19 @@ public class PulsarAdminToolTest {
namespaces.run(split("get-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
+
+ namespaces.run(split("set-anti-affinity-group myprop/clust/ns1 -g group"));
+ verify(mockNamespaces).setNamespaceAntiAffinityGroup("myprop/clust/ns1", "group");
+
+ namespaces.run(split("get-anti-affinity-group myprop/clust/ns1"));
+ verify(mockNamespaces).getNamespaceAntiAffinityGroup("myprop/clust/ns1");
+
+ namespaces.run(split("get-anti-affinity-namespaces -p dummy -c cluster -g group"));
+ verify(mockNamespaces).getAntiAffinityNamespaces("dummy", "cluster", "group");
+
+ namespaces.run(split("delete-anti-affinity-group myprop/clust/ns1 "));
+ verify(mockNamespaces).deleteNamespaceAntiAffinityGroup("myprop/clust/ns1");
+
namespaces.run(split("set-retention myprop/clust/ns1 -t 1h -s 1M"));
verify(mockNamespaces).setRetention("myprop/clust/ns1", new RetentionPolicies(60, 1));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
new file mode 100644
index 0000000..dd1d09b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/FailureDomain.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.base.Objects;
+
+public class FailureDomain {
+
+ public Set<String> brokers = new HashSet<String>();
+
+ public Set<String> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(Set<String> brokers) {
+ this.brokers = brokers;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof FailureDomain) {
+ FailureDomain other = (FailureDomain) obj;
+ return Objects.equal(brokers, other.brokers);
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("brokers", brokers).toString();
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a46b5fe..f3486b8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -42,6 +42,7 @@ public class Policies {
public int message_ttl_in_seconds = 0;
public RetentionPolicies retention_policies = null;
public boolean deleted = false;
+ public String antiAffinityGroup;
public static final String FIRST_BOUNDARY = "0x00000000";
public static final String LAST_BOUNDARY = "0xffffffff";
@@ -63,7 +64,8 @@ public class Policies {
&& message_ttl_in_seconds == other.message_ttl_in_seconds
&& Objects.equals(retention_policies, other.retention_policies)
&& Objects.equals(encryption_required, other.encryption_required)
- && Objects.equals(subscription_auth_mode, other.subscription_auth_mode);
+ && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
+ && Objects.equals(antiAffinityGroup, other.antiAffinityGroup);
}
return false;
@@ -86,6 +88,7 @@ public class Policies {
.add("deduplicationEnabled", deduplicationEnabled)
.add("clusterDispatchRate", clusterDispatchRate)
.add("latency_stats_sample_rate", latency_stats_sample_rate)
+ .add("antiAffinityGroup", antiAffinityGroup)
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
.add("deleted", deleted)
.add("encryption_required", encryption_required)
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index 06bd055..556a8b5 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -64,6 +64,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
private ProxyService proxyService;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ private final String configClusterName = "use";
@BeforeMethod
@Override
@@ -92,7 +93,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
- conf.setClusterName("use");
+ conf.setClusterName(configClusterName);
super.init();
@@ -157,7 +158,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);
- admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
+ admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
--
To stop receiving notification emails like this one, please contact
rdhabalia@apache.org.