You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/18 00:30:58 UTC
[pulsar] branch master updated: Ensure getting list of topics for
namespace is handled asynchronously (#5188)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 871a1e0 Ensure getting list of topics for namespace is handled asynchronously (#5188)
871a1e0 is described below
commit 871a1e028febc45742842754a801073a47d71698
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 17 17:30:50 2019 -0700
Ensure getting list of topics for namespace is handled asynchronously (#5188)
* Ensure getting list of topics for namespace is handled asynchrounously
* Fixed mocked zk deadlock
* Test fixes
* Fixed mutex unlocking in mock-zookeeper create method
* Fixed caching of empty values
* Do async call in background
* Fixed merge conflicts
* Fixed broken import from shaded class
---
.../java/org/apache/zookeeper/MockZooKeeper.java | 81 ++++++-----
.../org/apache/pulsar/broker/PulsarService.java | 3 +-
.../apache/pulsar/broker/admin/AdminResource.java | 1 +
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 +-
.../broker/admin/impl/PersistentTopicsBase.java | 3 +-
.../apache/pulsar/broker/admin/v1/Namespaces.java | 20 ++-
.../broker/admin/v1/NonPersistentTopics.java | 1 +
.../apache/pulsar/broker/admin/v2/Namespaces.java | 32 ++--
.../broker/admin/v2/NonPersistentTopics.java | 1 +
.../pulsar/broker/namespace/NamespaceService.java | 60 ++++----
.../apache/pulsar/broker/service/ServerCnx.java | 36 ++---
.../client/impl/PatternTopicsConsumerImplTest.java | 4 +-
.../pulsar/zookeeper/GlobalZooKeeperCache.java | 2 +-
.../pulsar/zookeeper/LocalZooKeeperCache.java | 2 +-
.../apache/pulsar/zookeeper/ZooKeeperCache.java | 161 +++++++++++++--------
.../pulsar/zookeeper/ZooKeeperChildrenCache.java | 43 ++++--
.../pulsar/zookeeper/ZookeeperCacheTest.java | 23 +--
17 files changed, 276 insertions(+), 203 deletions(-)
diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index 0afbcd6..c417604 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -126,6 +126,11 @@ public class MockZooKeeper extends ZooKeeper {
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
mutex.lock();
+
+ final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+ final Set<Watcher> toNotifyParent = Sets.newHashSet();
+ final String parent = path.substring(0, path.lastIndexOf("/"));
+
try {
checkProgrammedFail();
@@ -136,7 +141,6 @@ public class MockZooKeeper extends ZooKeeper {
throw new KeeperException.NodeExistsException(path);
}
- final String parent = path.substring(0, path.lastIndexOf("/"));
if (!parent.isEmpty() && !tree.containsKey(parent)) {
throw new KeeperException.NoNodeException();
}
@@ -152,55 +156,57 @@ public class MockZooKeeper extends ZooKeeper {
tree.put(path, Pair.of(data, 0));
- final Set<Watcher> toNotifyCreate = Sets.newHashSet();
toNotifyCreate.addAll(watchers.get(path));
- final Set<Watcher> toNotifyParent = Sets.newHashSet();
if (!parent.isEmpty()) {
toNotifyParent.addAll(watchers.get(parent));
}
watchers.removeAll(path);
- final String finalPath = path;
- executor.execute(() -> {
- toNotifyCreate.forEach(
- watcher -> watcher.process(
- new WatchedEvent(EventType.NodeCreated,
- KeeperState.SyncConnected,
- finalPath)));
- toNotifyParent.forEach(
- watcher -> watcher.process(
- new WatchedEvent(EventType.NodeChildrenChanged,
- KeeperState.SyncConnected,
- parent)));
- });
-
- return path;
} finally {
+
mutex.unlock();
}
+ final String finalPath = path;
+ executor.execute(() -> {
+
+ toNotifyCreate.forEach(
+ watcher -> watcher.process(
+ new WatchedEvent(EventType.NodeCreated,
+ KeeperState.SyncConnected,
+ finalPath)));
+ toNotifyParent.forEach(
+ watcher -> watcher.process(
+ new WatchedEvent(EventType.NodeChildrenChanged,
+ KeeperState.SyncConnected,
+ parent)));
+ });
+
+ return path;
}
@Override
public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
final StringCallback cb, final Object ctx) {
- if (stopped) {
- cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
- return;
- }
- final Set<Watcher> toNotifyCreate = Sets.newHashSet();
- toNotifyCreate.addAll(watchers.get(path));
-
- final Set<Watcher> toNotifyParent = Sets.newHashSet();
- final String parent = path.substring(0, path.lastIndexOf("/"));
- if (!parent.isEmpty()) {
- toNotifyParent.addAll(watchers.get(parent));
- }
- watchers.removeAll(path);
executor.execute(() -> {
mutex.lock();
+
+ if (stopped) {
+ cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+ return;
+ }
+
+ final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+ toNotifyCreate.addAll(watchers.get(path));
+
+ final Set<Watcher> toNotifyParent = Sets.newHashSet();
+ final String parent = path.substring(0, path.lastIndexOf("/"));
+ if (!parent.isEmpty()) {
+ toNotifyParent.addAll(watchers.get(parent));
+ }
+
if (getProgrammedFailStatus()) {
mutex.unlock();
cb.processResult(failReturnCode.intValue(), path, ctx, null);
@@ -215,6 +221,7 @@ public class MockZooKeeper extends ZooKeeper {
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(path, Pair.of(data, 0));
+ watchers.removeAll(path);
mutex.unlock();
cb.processResult(0, path, ctx, null);
@@ -331,6 +338,12 @@ public class MockZooKeeper extends ZooKeeper {
return;
}
+ if (!tree.containsKey(path)) {
+ mutex.unlock();
+ cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
+ return;
+ }
+
List<String> children = Lists.newArrayList();
for (String item : tree.tailMap(path).keySet()) {
if (!item.startsWith(path)) {
@@ -347,12 +360,12 @@ public class MockZooKeeper extends ZooKeeper {
}
}
- mutex.unlock();
-
- cb.processResult(0, path, ctx, children);
if (watcher != null) {
watchers.put(path, watcher);
}
+ mutex.unlock();
+
+ cb.processResult(0, path, ctx, children);
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 113c081..2818744 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -649,7 +649,7 @@ public class PulsarService implements AutoCloseable {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();
- for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) {
+ for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
@@ -974,7 +974,6 @@ public class PulsarService implements AutoCloseable {
return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
}
-
private void startWorkerService(AuthenticationService authenticationService,
AuthorizationService authorizationService)
throws InterruptedException, IOException, KeeperException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ed497e8..92aaecd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -597,6 +597,7 @@ public abstract class AdminResource extends PulsarWebResource {
try {
topicExist = pulsar.getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .join()
.contains(topicName.toString());
} catch (Exception e) {
log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5d6d8fc..02c3e46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -24,8 +24,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
-import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -189,7 +187,7 @@ public abstract class NamespacesBase extends AdminResource {
boolean isEmpty;
try {
- isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty()
+ isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty()
&& getPartitionedTopicList(TopicDomain.persistent).isEmpty()
&& getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
} catch (Exception e) {
@@ -319,7 +317,7 @@ public abstract class NamespacesBase extends AdminResource {
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
try {
- List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+ List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
for (String topic : topics) {
NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 509b41d..71c758b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -378,6 +378,7 @@ public class PersistentTopicsBase extends AdminResource {
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
@@ -1201,7 +1202,7 @@ public class PersistentTopicsBase extends AdminResource {
return;
}
}
-
+
if (partitionException.get() != null) {
log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, targetMessageId, partitionException.get());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index a47bc23..a0fbeca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -113,21 +113,25 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(hidden = true, value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
- public List<String> getTopics(@PathParam("property") String property,
+ public void getTopics(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+ @Suspended AsyncResponse asyncResponse) {
validateAdminAccessForTenant(property);
validateNamespaceName(property, cluster, namespace);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
- try {
- return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
- } catch (Exception e) {
- log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e);
- throw new RestException(e);
- }
+ pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ asyncResponse.resume(topics);
+ })
+ .exceptionally(ex -> {
+ log.error("Failed to get topics list for namespace {}", namespaceName, ex);
+ asyncResponse.resume(ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 96bc084..7667167 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -129,6 +129,7 @@ public class NonPersistentTopics extends PersistentTopics {
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 435892e..bd5ff0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.broker.admin.v2;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -49,17 +54,12 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
-import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-
@Path("/namespaces")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@@ -80,21 +80,25 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
- public List<String> getTopics(@PathParam("tenant") String tenant,
+ public void getTopics(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
- @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+ @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+ @Suspended AsyncResponse asyncResponse) {
validateAdminAccessForTenant(tenant);
validateNamespaceName(tenant, namespace);
// Validate that namespace exists, throws 404 if it doesn't exist
getNamespacePolicies(namespaceName);
- try {
- return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
- } catch (Exception e) {
- log.error("Failed to get topics list for namespace {}", namespaceName, e);
- throw new RestException(e);
- }
+ pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ asyncResponse.resume(topics);
+ })
+ .exceptionally(ex -> {
+ log.error("Failed to get topics list for namespace {}", namespaceName, ex);
+ asyncResponse.resume(ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 10dc5ee..9bc65e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -174,6 +174,7 @@ public class NonPersistentTopics extends PersistentTopics {
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+ .join()
.contains(topicName.toString());
if (topicExist) {
log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5fbc20f..83e3ade 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.namespace;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;
+
+import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
@@ -54,7 +56,6 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URL;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -863,14 +865,15 @@ public class NamespaceService {
return getBundle(topicName);
}
- public List<String> getFullListOfTopics(NamespaceName namespaceName) throws Exception {
- List<String> topics = getListOfPersistentTopics(namespaceName);
- topics.addAll(getListOfNonPersistentTopics(namespaceName));
- return topics;
+ public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
+ return getListOfPersistentTopics(namespaceName)
+ .thenCombine(getListOfNonPersistentTopics(namespaceName),
+ (persistentTopics, nonPersistentTopics) -> {
+ return ListUtils.union(persistentTopics, nonPersistentTopics);
+ });
}
- public List<String> getListOfTopics(NamespaceName namespaceName, Mode mode)
- throws Exception {
+ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
switch (mode) {
case ALL:
return getFullListOfTopics(namespaceName);
@@ -882,30 +885,26 @@ public class NamespaceService {
}
}
- public List<String> getListOfPersistentTopics(NamespaceName namespaceName) throws Exception {
- List<String> topics = Lists.newArrayList();
-
+ public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
// For every topic there will be a managed ledger created.
- try {
- String path = String.format("/managed-ledgers/%s/persistent", namespaceName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Getting children from managed-ledgers now: {}", path);
- }
-
- for (String topic : pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) {
- topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(topic)));
- }
- } catch (KeeperException.NoNodeException e) {
- // NoNode means there are no persistent topics for this namespace
+ String path = String.format("/managed-ledgers/%s/persistent", namespaceName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Getting children from managed-ledgers now: {}", path);
}
- topics.sort(null);
- return topics;
- }
+ return pulsar.getLocalZkCacheService().managedLedgerListCache().getAsync(path)
+ .thenApply(znodes -> {
+ List<String> topics = Lists.newArrayList();
+ for (String znode : znodes) {
+ topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(znode)));
+ }
- public List<String> getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception {
- List<String> topics = Lists.newArrayList();
+ topics.sort(null);
+ return topics;
+ });
+ }
+ public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
ClusterData peerClusterData;
try {
peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
@@ -921,6 +920,7 @@ public class NamespaceService {
}
// Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache.
+ List<String> topics = Lists.newArrayList();
synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
@@ -935,13 +935,13 @@ public class NamespaceService {
}
topics.sort(null);
- return topics;
+ return CompletableFuture.completedFuture(topics);
}
- private List<String> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
- NamespaceName namespace) throws Exception {
+ private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
+ NamespaceName namespace) {
PulsarClientImpl client = getNamespaceClient(peerClusterData);
- return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get();
+ return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d91bd1a..1cab8e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1297,27 +1297,27 @@ public class ServerCnx extends PulsarHandler {
final long requestId = commandGetTopicsOfNamespace.getRequestId();
final String namespace = commandGetTopicsOfNamespace.getNamespace();
final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
+ final NamespaceName namespaceName = NamespaceName.get(namespace);
- try {
- final NamespaceName namespaceName = NamespaceName.get(namespace);
-
- final List<String> topics = getBrokerService().pulsar().getNamespaceService()
- .getListOfTopics(namespaceName, mode);
+ getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+ .thenAccept(topics -> {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
+ remoteAddress, namespace, requestId, topics.size());
+ }
- if (log.isDebugEnabled()) {
- log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
- remoteAddress, namespace, requestId, topics.size());
- }
+ ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
+ })
+ .exceptionally(ex -> {
+ log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
+ remoteAddress, namespace, requestId);
+ ctx.writeAndFlush(
+ Commands.newError(requestId,
+ BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+ ex.getMessage()));
- ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
- } catch (Exception e) {
- log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
- remoteAddress, namespace, requestId);
- ctx.writeAndFlush(
- Commands.newError(requestId,
- BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)),
- e.getMessage()));
- }
+ return null;
+ });
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 1d33641..18cf12c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
@@ -725,7 +726,8 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
// seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
List<String> topicNames = Lists.newArrayList(topicName2);
NamespaceService nss = pulsar.getNamespaceService();
- doReturn(topicNames).when(nss).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
+ doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
+ .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
// 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
log.debug("recheck topics change");
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index 8a7c3e7..671ef50 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -115,7 +115,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
//
dataCache.synchronous().invalidateAll();
- childrenCache.invalidateAll();
+ childrenCache.synchronous().invalidateAll();
return;
default:
break;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 3045a0b..79b9738 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -51,7 +51,7 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
// in case of expired, the zkSession is no longer good
LOG.warn("Lost connection from local ZK. Invalidating the whole cache.");
dataCache.synchronous().invalidateAll();
- childrenCache.invalidateAll();
+ childrenCache.synchronous().invalidateAll();
return;
default:
break;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 6136481..eaab17b 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -22,17 +22,17 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;
+import java.io.IOException;
import java.nio.file.Paths;
import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collections;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -41,6 +41,9 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -77,8 +80,8 @@ public abstract class ZooKeeperCache implements Watcher {
public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";
protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
- protected final Cache<String, Set<String>> childrenCache;
- protected final Cache<String, Boolean> existsCache;
+ protected final AsyncLoadingCache<String, Set<String>> childrenCache;
+ protected final AsyncLoadingCache<String, Boolean> existsCache;
private final OrderedExecutor executor;
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
@@ -96,8 +99,10 @@ public abstract class ZooKeeperCache implements Watcher {
this.dataCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
.buildAsync((key, executor1) -> null);
- this.childrenCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
- this.existsCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+ this.childrenCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
+ .buildAsync((key, executor1) -> null);
+ this.existsCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
+ .buildAsync((key, executor1) -> null);
}
public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
@@ -114,14 +119,14 @@ public abstract class ZooKeeperCache implements Watcher {
final String path = event.getPath();
if (path != null) {
dataCache.synchronous().invalidate(path);
- childrenCache.invalidate(path);
+ childrenCache.synchronous().invalidate(path);
// sometimes zk triggers one watch per zk-session and if zkDataCache and ZkChildrenCache points to this
// ZookeeperCache instance then ZkChildrenCache may not invalidate for it's parent. Therefore, invalidate
// cache for parent if child is created/deleted
if (event.getType().equals(EventType.NodeCreated) || event.getType().equals(EventType.NodeDeleted)) {
- childrenCache.invalidate(Paths.get(path).getParent().toString());
+ childrenCache.synchronous().invalidate(Paths.get(path).getParent().toString());
}
- existsCache.invalidate(path);
+ existsCache.synchronous().invalidate(path);
if (executor != null && updater != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater);
@@ -152,7 +157,7 @@ public abstract class ZooKeeperCache implements Watcher {
}
private void invalidateAllExists() {
- existsCache.invalidateAll();
+ existsCache.synchronous().invalidateAll();
}
public void invalidateAllData() {
@@ -160,7 +165,7 @@ public abstract class ZooKeeperCache implements Watcher {
}
public void invalidateAllChildren() {
- childrenCache.invalidateAll();
+ childrenCache.synchronous().invalidateAll();
}
public void invalidateData(String path) {
@@ -168,11 +173,11 @@ public abstract class ZooKeeperCache implements Watcher {
}
public void invalidateChildren(String path) {
- childrenCache.invalidate(path);
+ childrenCache.synchronous().invalidate(path);
}
private void invalidateExists(String path) {
- existsCache.invalidate(path);
+ existsCache.synchronous().invalidate(path);
}
public void asyncInvalidate(String path) {
@@ -203,20 +208,30 @@ public abstract class ZooKeeperCache implements Watcher {
}
private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {
- try {
- return existsCache.get(path, () -> zkSession.get().exists(path, watcher) != null);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof KeeperException) {
- throw (KeeperException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else if (cause instanceof RuntimeException) {
- throw (RuntimeException) cause;
- } else {
- throw new RuntimeException(cause);
+ return existsAsync(path, watcher).join();
+ }
+
+ @SuppressWarnings("deprecation")
+ public CompletableFuture<Boolean> existsAsync(String path, Watcher watcher) {
+ return existsCache.get(path, (p, executor) -> {
+ ZooKeeper zk = zkSession.get();
+ if (zk == null) {
+ return FutureUtil.failedFuture(new IOException("ZK session not ready"));
}
- }
+
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ zk.exists(path, watcher, (StatCallback) (rc, path1, ctx, stat) -> {
+ if (rc == Code.OK.intValue()) {
+ future.complete(true);
+ } else if (rc == Code.NONODE.intValue()) {
+ future.complete(false);
+ } else {
+ future.completeExceptionally(KeeperException.create(rc));
+ }
+ }, null);
+
+ return future;
+ });
}
/**
@@ -365,7 +380,15 @@ public abstract class ZooKeeperCache implements Watcher {
* @throws InterruptedException
*/
public Set<String> getChildren(final String path) throws KeeperException, InterruptedException {
- return getChildren(path, this);
+ try {
+ return getChildrenAsync(path, this).join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof KeeperException) {
+ throw (KeeperException)e.getCause();
+ } else {
+ throw e;
+ }
+ }
}
/**
@@ -375,35 +398,50 @@ public abstract class ZooKeeperCache implements Watcher {
* @param path
* @param watcher
* @return
- * @throws KeeperException
- * @throws InterruptedException
*/
- public Set<String> getChildren(final String path, final Watcher watcher)
- throws KeeperException, InterruptedException {
- try {
- return childrenCache.get(path, () -> {
- LOG.debug("Fetching children at {}", path);
- return Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
- });
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- // The node we want may not exist yet, so put a watcher on its existance
- // before throwing up the exception. Its possible that the node could have
- // been created after the call to getChildren, but before the call to exists().
- // If this is the case, exists will return true, and we just call getChildren again.
- if (cause instanceof KeeperException.NoNodeException
- && exists(path, watcher)) {
- return getChildren(path, watcher);
- } else if (cause instanceof KeeperException) {
- throw (KeeperException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else if (cause instanceof RuntimeException) {
- throw (RuntimeException) cause;
- } else {
- throw new RuntimeException(cause);
- }
- }
+ @SuppressWarnings("deprecation")
+ public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watcher) {
+ return childrenCache.get(path, (p, executor) -> {
+ CompletableFuture<Set<String>> future = new CompletableFuture<>();
+ executor.execute(SafeRunnable.safeRun(() -> {
+ ZooKeeper zk = zkSession.get();
+ if (zk == null) {
+ future.completeExceptionally(new IOException("ZK session not ready"));
+ return;
+ }
+
+ zk.getChildren(path, watcher, (ChildrenCallback) (rc, path1, ctx, children) -> {
+ if (rc == Code.OK.intValue()) {
+ future.complete(Sets.newTreeSet(children));
+ } else if (rc == Code.NONODE.intValue()) {
+ // The node we want may not exist yet, so put a watcher on its existence
+ // before throwing up the exception. Its possible that the node could have
+ // been created after the call to getChildren, but before the call to exists().
+ // If this is the case, exists will return true, and we just call getChildren again.
+ existsAsync(path, watcher).thenAccept(exists -> {
+ if (exists) {
+ getChildrenAsync(path, watcher)
+ .thenAccept(c -> future.complete(c))
+ .exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ // Z-node does not exist
+ future.complete(Collections.emptySet());
+ }
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ future.completeExceptionally(KeeperException.create(rc));
+ }
+ }, null);
+ }));
+
+ return future;
+ });
}
@SuppressWarnings("unchecked")
@@ -412,7 +450,12 @@ public abstract class ZooKeeperCache implements Watcher {
}
public Set<String> getChildrenIfPresent(String path) {
- return childrenCache.getIfPresent(path);
+ CompletableFuture<Set<String>> future = childrenCache.getIfPresent(path);
+ if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
+ return future.getNow(null);
+ } else {
+ return null;
+ }
}
@Override
@@ -422,9 +465,9 @@ public abstract class ZooKeeperCache implements Watcher {
}
public void invalidateRoot(String root) {
- for (String key : childrenCache.asMap().keySet()) {
+ for (String key : childrenCache.synchronous().asMap().keySet()) {
if (key.startsWith(root)) {
- childrenCache.invalidate(key);
+ childrenCache.synchronous().invalidate(key);
}
}
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index cc8352e..0bb6f46 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.zookeeper;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
@@ -49,12 +51,24 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
}
public Set<String> get() throws KeeperException, InterruptedException {
- return cache.getChildren(path, this);
+ return get(this.path);
}
public Set<String> get(String path) throws KeeperException, InterruptedException {
- LOG.debug("getChildren called at: {}", path);
- return cache.getChildren(path, this);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("getChildren called at: {}", path);
+ }
+
+ Set<String> children = cache.getChildrenAsync(path, this).join();
+ if (children == null) {
+ throw KeeperException.create(KeeperException.Code.NONODE);
+ }
+
+ return children;
+ }
+
+ public CompletableFuture<Set<String>> getAsync(String path) {
+ return cache.getChildrenAsync(path, this);
}
public void clear() {
@@ -67,18 +81,17 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
@Override
public void reloadCache(final String path) {
- try {
- cache.invalidate(path);
- Set<String> children = cache.getChildren(path, this);
- LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path);
- for (ZooKeeperCacheListener<Set<String>> listener : listeners) {
- listener.onUpdate(path, children, null);
- }
- } catch (KeeperException.NoNodeException nne) {
- LOG.debug("Node [{}] does not exist", nne.getPath());
- } catch (Exception e) {
- LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path);
- }
+ cache.invalidate(path);
+ cache.getChildrenAsync(path, this)
+ .thenAccept(children -> {
+ LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path);
+ for (ZooKeeperCacheListener<Set<String>> listener : listeners) {
+ listener.onUpdate(path, children, null);
+ }
+ }).exceptionally(ex -> {
+ LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path, ex);
+ return null;
+ }).join();
}
@Override
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e25c1ad..834844a 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -25,6 +25,13 @@ import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -36,7 +43,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.WatchedEvent;
@@ -50,12 +56,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
@Test
public class ZookeeperCacheTest {
private MockZooKeeper zkClient;
@@ -194,12 +194,7 @@ public class ZookeeperCacheTest {
cache.unregisterListener(counter);
assertEquals(notificationCount.get(), 0);
- try {
- cache.get();
- fail("Expect this to fail");
- } catch (KeeperException.NoNodeException nne) {
- // correct
- }
+ assertEquals(cache.get(), Collections.emptySet());
zkClient.create("/test", new byte[0], null, null);
zkClient.create("/test/z1", new byte[0], null, null);
@@ -231,8 +226,6 @@ public class ZookeeperCacheTest {
} catch (Exception e) {
// Ok
}
-
- assertEquals(notificationCount.get(), (recvNotifications + 1));
}
@Test(timeOut = 10000)