You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/10/08 15:51:42 UTC
[pulsar] branch master updated: Ensure the handling of
PartitionMetadataRequest is async end-to-end (#5307)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 71cfe3a Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)
71cfe3a is described below
commit 71cfe3a6e0b8c21da714902a07722afa96036a17
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Oct 8 08:51:37 2019 -0700
Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)
* Ensure the handling of PartitionMetadataRequest is async end-to-end
* Fixed z-node path
---
.../apache/pulsar/broker/ServiceConfiguration.java | 18 +++
.../broker/cache/ConfigurationCacheService.java | 12 +-
.../apache/pulsar/broker/admin/AdminResource.java | 147 ++-------------------
.../broker/admin/impl/PersistentTopicsBase.java | 7 +-
.../broker/cache/LocalZooKeeperCacheService.java | 4 +
.../pulsar/broker/namespace/NamespaceService.java | 67 ++++++----
.../pulsar/broker/service/BrokerService.java | 74 +++++++++++
.../apache/pulsar/broker/service/ServerCnx.java | 61 ++++-----
8 files changed, 189 insertions(+), 201 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5ed90ce..4f6acb0 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1382,4 +1382,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
public Optional<Integer> getWebServicePortTls() {
return webServicePortTls;
}
+
+ public boolean isDefaultTopicTypePartitioned() {
+ return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
+ }
+
+ enum TopicType {
+ PARTITIONED("partitioned"),
+ NON_PARTITIONED("non-partitioned");
+ private String type;
+
+ TopicType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 4ac6cab..06781ff 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -67,6 +67,8 @@ public class ConfigurationCacheService {
public static final String POLICIES_ROOT = "/admin/policies";
private static final String CLUSTERS_ROOT = "/admin/clusters";
+ public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";
+
public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
this(cache, null);
}
@@ -98,7 +100,7 @@ public class ConfigurationCacheService {
};
this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);
-
+
CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
if (isNotBlank(configuredClusterName)) {
createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
@@ -114,7 +116,7 @@ public class ConfigurationCacheService {
}));
}
};
-
+
this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
@Override
public FailureDomain deserialize(String path, byte[] content) throws Exception {
@@ -169,7 +171,7 @@ public class ConfigurationCacheService {
public ZooKeeperCache cache() {
return cache;
}
-
+
public ZooKeeperDataCache<TenantInfo> propertiesCache() {
return this.propertiesCache;
}
@@ -189,7 +191,7 @@ public class ConfigurationCacheService {
public ZooKeeperChildrenCache failureDomainListCache() {
return this.failureDomainListCache;
}
-
+
public ZooKeeper getZooKeeper() {
return this.cache.getZooKeeper();
}
@@ -197,7 +199,7 @@ public class ConfigurationCacheService {
public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
return this.namespaceIsolationPoliciesCache;
}
-
+
public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
return this.failureDomainCache;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 39ff495..02ebd06 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,12 +18,13 @@
*/
package org.apache.pulsar.broker.admin;
-import com.fasterxml.jackson.core.JsonProcessingException;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
import static org.apache.pulsar.common.util.Codec.decode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
import java.net.MalformedURLException;
import java.net.URI;
import java.util.List;
@@ -43,13 +44,13 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -77,13 +78,9 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
public abstract class AdminResource extends PulsarWebResource {
private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
- private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
protected ZooKeeper globalZk() {
@@ -521,12 +518,11 @@ public abstract class AdminResource extends PulsarWebResource {
throw new RestException(e);
}
- String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata;
if (checkAllowAutoCreation) {
- partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
+ partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
} else {
- partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+ partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
}
if (log.isDebugEnabled()) {
@@ -536,9 +532,9 @@ public abstract class AdminResource extends PulsarWebResource {
return partitionMetadata;
}
- protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
+ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
try {
- return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
+ return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e;
@@ -547,37 +543,10 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
- protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(
- PulsarService pulsar, String path) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
- try {
- // gets the number of partitions from the zk cache
- pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
- @Override
- public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
- return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
- }
- }).thenAccept(metadata -> {
- // if the partitioned topic is not found in zk, then the topic is not partitioned
- if (metadata.isPresent()) {
- metadataFuture.complete(metadata.get());
- } else {
- metadataFuture.complete(new PartitionedTopicMetadata());
- }
- }).exceptionally(ex -> {
- metadataFuture.completeExceptionally(ex);
- return null;
- });
- } catch (Exception e) {
- metadataFuture.completeExceptionally(e);
- }
- return metadataFuture;
- }
-
protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
- PulsarService pulsar, String path, TopicName topicName) {
+ PulsarService pulsar, TopicName topicName) {
try {
- return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)
+ return pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
.get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
@@ -587,85 +556,7 @@ public abstract class AdminResource extends PulsarWebResource {
}
}
- protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
- PulsarService pulsar, String path, TopicName topicName) {
- CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
- try {
- boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation();
- String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
- boolean topicExist;
- try {
- topicExist = pulsar.getNamespaceService()
- .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
- .join()
- .contains(topicName.toString());
- } catch (Exception e) {
- log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
- topicName, e.getMessage(), e);
- throw new RestException(e);
- }
- fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> {
- if (ex != null) {
- metadataFuture.completeExceptionally(ex);
- // If topic is already exist, creating partitioned topic is not allowed.
- } else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation &&
- TopicType.PARTITIONED.toString().equals(topicType)) {
- createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> {
- if (e == null) {
- metadataFuture.complete(defaultMetadata);
- } else if (e instanceof KeeperException) {
- try {
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
- if (!pulsar.getGlobalZkCache().exists(path)){
- metadataFuture.completeExceptionally(e);
- return;
- }
- } catch (InterruptedException | KeeperException exc) {
- metadataFuture.completeExceptionally(exc);
- return;
- }
- fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> {
- if (ex2 != null) {
- metadataFuture.completeExceptionally(ex2);
- } else {
- metadataFuture.complete(metadata2);
- }
- });
- } else {
- metadataFuture.completeExceptionally(e);
- }
- });
- } else {
- metadataFuture.complete(metadata);
- }
- });
- } catch (Exception e) {
- metadataFuture.completeExceptionally(e);
- }
- return metadataFuture;
- }
-
- protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(
- PulsarService pulsar, String path) {
- int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
- checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
- PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
- CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
- try {
- byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
- ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content,
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- // we wait for the data to be synced in all quorums and the observers
- Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
- partitionedTopicFuture.complete(configMetadata);
- } catch (JsonProcessingException | KeeperException | InterruptedException e) {
- log.error("Failed to create default partitioned topic.", e);
- partitionedTopicFuture.completeExceptionally(e);
- }
- return partitionedTopicFuture;
- }
-
- protected void validateClusterExists(String cluster) {
+ protected void validateClusterExists(String cluster) {
try {
if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
@@ -730,18 +621,4 @@ public abstract class AdminResource extends PulsarWebResource {
partitionedTopics.sort(null);
return partitionedTopics;
}
-
- enum TopicType {
- PARTITIONED("partitioned"),
- NON_PARTITIONED("non-partitioned");
- private String type;
-
- TopicType(String type) {
- this.type = type;
- }
-
- public String toString() {
- return type;
- }
- }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ba5ce25..0545274 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -115,7 +115,7 @@ import org.slf4j.LoggerFactory;
public class PersistentTopicsBase extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
- protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
+ public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
@@ -1670,7 +1670,8 @@ public class PersistentTopicsBase extends AdminResource {
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
- .thenCompose(res -> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName))
+ .thenCompose(res -> pulsar.getBrokerService()
+ .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
@@ -1786,7 +1787,7 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding());
CompletableFuture<Void> result = new CompletableFuture<>();
- fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionMetadata -> {
+ pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions <= 1) {
result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic"));
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 4b28cad..625f805 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -247,4 +247,8 @@ public class LocalZooKeeperCacheService {
public ZooKeeperChildrenCache managedLedgerListCache() {
return this.managedLedgerListCache;
}
+
+ public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
+ return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 83e3ade..7070e54 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -873,6 +874,17 @@ public class NamespaceService {
});
}
+ public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
+ if (topic.isPersistent()) {
+ return pulsar.getLocalZkCacheService()
+ .managedLedgerExists(topic.getPersistenceNamingEncoding());
+ } else {
+ return pulsar.getBrokerService()
+ .getTopicIfExists(topic.toString())
+ .thenApply(optTopic -> optTopic.isPresent());
+ }
+ }
+
public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
switch (mode) {
case ALL:
@@ -905,37 +917,36 @@ public class NamespaceService {
}
public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
- ClusterData peerClusterData;
- try {
- peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
- .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new RuntimeException("Failed to contact peer replication cluster.", e);
- }
-
- // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
- // redirect to the peer-cluster
- if (peerClusterData != null) {
- return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
- }
- // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache.
- List<String> topics = Lists.newArrayList();
- synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
- if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
- pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
- .forEach(bundle -> {
- bundle.forEach((topicName, topic) -> {
- if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) {
- topics.add(topicName);
+ return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
+ .thenCompose(peerClusterData -> {
+ // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request
+ // should be redirect to the peer-cluster
+ if (peerClusterData != null) {
+ return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
+ } else {
+ // Non-persistent topics don't have managed ledgers so we have to retrieve them from local
+ // cache.
+ List<String> topics = Lists.newArrayList();
+ synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
+ if (pulsar.getBrokerService().getMultiLayerTopicMap()
+ .containsKey(namespaceName.toString())) {
+ pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
+ .forEach(bundle -> {
+ bundle.forEach((topicName, topic) -> {
+ if (topic instanceof NonPersistentTopic
+ && ((NonPersistentTopic) topic).isActive()) {
+ topics.add(topicName);
+ }
+ });
+ });
}
- });
- });
- }
- }
+ }
- topics.sort(null);
- return CompletableFuture.completedFuture(topics);
+ topics.sort(null);
+ return CompletableFuture.completedFuture(topics);
+ }
+ });
}
private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fa3c14e..490a691 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -69,6 +69,7 @@ import java.util.function.Predicate;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -77,6 +78,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -84,8 +86,10 @@ import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -119,6 +123,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -139,6 +144,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -1611,6 +1617,74 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
+ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) {
+ return pulsar.getNamespaceService().checkTopicExists(topicName)
+ .thenCompose(topicExists -> {
+ return fetchPartitionedTopicMetadataAsync(topicName)
+ .thenCompose(metadata -> {
+ // If topic is already exist, creating partitioned topic is not allowed.
+ if (metadata.partitions == 0
+ && !topicExists
+ && pulsar.getConfiguration().isAllowAutoTopicCreation()
+ && pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
+ return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+ } else {
+ return CompletableFuture.completedFuture(metadata);
+ }
+ });
+ });
+ }
+
+ @SuppressWarnings("deprecation")
+ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
+ int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+ checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
+
+ PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
+ CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
+
+ try {
+ byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata);
+
+ ZkUtils.asyncCreateFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(),
+ partitionedTopicPath(topicName), content,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
+ if (rc == KeeperException.Code.OK.intValue()) {
+ // we wait for the data to be synced in all quorums and the observers
+ executor().schedule(
+ SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)),
+ PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS);
+ } else {
+ partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
+ }
+ }, null);
+
+ } catch (Exception e) {
+ log.error("Failed to create default partitioned topic.", e);
+ return FutureUtil.failedFuture(e);
+ }
+
+ return partitionedTopicFuture;
+ }
+
+ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
+ // gets the number of partitions from the zk cache
+ return pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key, content) -> {
+ return ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
+ }).thenApply(metadata -> {
+ // if the partitioned topic is not found in zk, then the topic is not partitioned
+ return metadata.orElseGet(() -> new PartitionedTopicMetadata());
+ });
+ }
+
+ private static String partitionedTopicPath(TopicName topicName) {
+ return String.format("%s/%s/%s/%s",
+ ConfigurationCacheService.PARTITIONED_TOPICS_ROOT,
+ topicName.getNamespace(),
+ topicName.getDomain(),
+ topicName.getEncodedLocalName());
+ }
+
public OrderedExecutor getTopicOrderedExecutor() {
return topicOrderedExecutor;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7a3a3d5..005f615 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -340,40 +340,41 @@ public class ServerCnx extends PulsarHandler {
}
String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
- if (isProxyAuthorized) {
+ if (isProxyAuthorized) {
getPartitionedTopicMetadata(getBrokerService().pulsar(),
- authRole, finalOriginalPrincipal, authenticationData,
+ authRole, finalOriginalPrincipal, authenticationData,
topicName).handle((metadata, ex) -> {
- if (ex == null) {
- int partitions = metadata.partitions;
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+ if (ex == null) {
+ int partitions = metadata.partitions;
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+ } else {
+ if (ex instanceof PulsarClientException) {
+ log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
+ remoteAddress, topicName, ex.getMessage());
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+ ServerError.AuthorizationError, ex.getMessage(), requestId));
} else {
- if (ex instanceof PulsarClientException) {
- log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
- remoteAddress, topicName, ex.getMessage());
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
- ServerError.AuthorizationError, ex.getMessage(), requestId));
- } else {
- log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
- topicName, ex.getMessage(), ex);
- ServerError error = (ex instanceof RestException)
- && ((RestException) ex).getResponse().getStatus() < 500
- ? ServerError.MetadataError : ServerError.ServiceNotReady;
- ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
- ex.getMessage(), requestId));
- }
+ log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
+ topicName, ex.getMessage(), ex);
+ ServerError error = (ex instanceof RestException)
+ && ((RestException) ex).getResponse().getStatus() < 500
+ ? ServerError.MetadataError
+ : ServerError.ServiceNotReady;
+ ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+ ex.getMessage(), requestId));
}
- lookupSemaphore.release();
- return null;
- });
- } else {
- final String msg = "Proxy Client is not authorized to Get Partition Metadata";
- log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
- ctx.writeAndFlush(
- Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
- lookupSemaphore.release();
- }
- return null;
+ }
+ lookupSemaphore.release();
+ return null;
+ });
+ } else {
+ final String msg = "Proxy Client is not authorized to Get Partition Metadata";
+ log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+ ctx.writeAndFlush(
+ Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+ lookupSemaphore.release();
+ }
+ return null;
}).exceptionally(ex -> {
final String msg = "Exception occured while trying to authorize get Partition Metadata";
log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);