You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/18 00:30:58 UTC

[pulsar] branch master updated: Ensure getting list of topics for namespace is handled asynchronously (#5188)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 871a1e0  Ensure getting list of topics for namespace is handled asynchronously (#5188)
871a1e0 is described below

commit 871a1e028febc45742842754a801073a47d71698
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Sep 17 17:30:50 2019 -0700

    Ensure getting list of topics for namespace is handled asynchronously (#5188)
    
    * Ensure getting list of topics for namespace is handled asynchrounously
    
    * Fixed mocked zk deadlock
    
    * Test fixes
    
    * Fixed mutex unlocking in mock-zookeeper create method
    
    * Fixed caching of empty values
    
    * Do async call in background
    
    * Fixed merge conflicts
    
    * Fixed broken import from shaded class
---
 .../java/org/apache/zookeeper/MockZooKeeper.java   |  81 ++++++-----
 .../org/apache/pulsar/broker/PulsarService.java    |   3 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   1 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   6 +-
 .../broker/admin/impl/PersistentTopicsBase.java    |   3 +-
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  20 ++-
 .../broker/admin/v1/NonPersistentTopics.java       |   1 +
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  32 ++--
 .../broker/admin/v2/NonPersistentTopics.java       |   1 +
 .../pulsar/broker/namespace/NamespaceService.java  |  60 ++++----
 .../apache/pulsar/broker/service/ServerCnx.java    |  36 ++---
 .../client/impl/PatternTopicsConsumerImplTest.java |   4 +-
 .../pulsar/zookeeper/GlobalZooKeeperCache.java     |   2 +-
 .../pulsar/zookeeper/LocalZooKeeperCache.java      |   2 +-
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 161 +++++++++++++--------
 .../pulsar/zookeeper/ZooKeeperChildrenCache.java   |  43 ++++--
 .../pulsar/zookeeper/ZookeeperCacheTest.java       |  23 +--
 17 files changed, 276 insertions(+), 203 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
index 0afbcd6..c417604 100644
--- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java
@@ -126,6 +126,11 @@ public class MockZooKeeper extends ZooKeeper {
     public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
             throws KeeperException, InterruptedException {
         mutex.lock();
+
+        final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+        final Set<Watcher> toNotifyParent = Sets.newHashSet();
+        final String parent = path.substring(0, path.lastIndexOf("/"));
+
         try {
             checkProgrammedFail();
 
@@ -136,7 +141,6 @@ public class MockZooKeeper extends ZooKeeper {
                 throw new KeeperException.NodeExistsException(path);
             }
 
-            final String parent = path.substring(0, path.lastIndexOf("/"));
             if (!parent.isEmpty() && !tree.containsKey(parent)) {
                 throw new KeeperException.NoNodeException();
             }
@@ -152,55 +156,57 @@ public class MockZooKeeper extends ZooKeeper {
 
             tree.put(path, Pair.of(data, 0));
 
-            final Set<Watcher> toNotifyCreate = Sets.newHashSet();
             toNotifyCreate.addAll(watchers.get(path));
 
-            final Set<Watcher> toNotifyParent = Sets.newHashSet();
             if (!parent.isEmpty()) {
                 toNotifyParent.addAll(watchers.get(parent));
             }
             watchers.removeAll(path);
-            final String finalPath = path;
-            executor.execute(() -> {
-                    toNotifyCreate.forEach(
-                            watcher -> watcher.process(
-                                    new WatchedEvent(EventType.NodeCreated,
-                                                     KeeperState.SyncConnected,
-                                                     finalPath)));
-                    toNotifyParent.forEach(
-                            watcher -> watcher.process(
-                                    new WatchedEvent(EventType.NodeChildrenChanged,
-                                                     KeeperState.SyncConnected,
-                                                     parent)));
-                });
-
-            return path;
         } finally {
+
             mutex.unlock();
         }
 
+        final String finalPath = path;
+        executor.execute(() -> {
+
+            toNotifyCreate.forEach(
+                    watcher -> watcher.process(
+                            new WatchedEvent(EventType.NodeCreated,
+                                    KeeperState.SyncConnected,
+                                    finalPath)));
+            toNotifyParent.forEach(
+                    watcher -> watcher.process(
+                            new WatchedEvent(EventType.NodeChildrenChanged,
+                                    KeeperState.SyncConnected,
+                                    parent)));
+        });
+
+        return path;
     }
 
     @Override
     public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode,
             final StringCallback cb, final Object ctx) {
-        if (stopped) {
-            cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
-            return;
-        }
 
-        final Set<Watcher> toNotifyCreate = Sets.newHashSet();
-        toNotifyCreate.addAll(watchers.get(path));
-
-        final Set<Watcher> toNotifyParent = Sets.newHashSet();
-        final String parent = path.substring(0, path.lastIndexOf("/"));
-        if (!parent.isEmpty()) {
-            toNotifyParent.addAll(watchers.get(parent));
-        }
-        watchers.removeAll(path);
 
         executor.execute(() -> {
             mutex.lock();
+
+            if (stopped) {
+                cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
+                return;
+            }
+
+            final Set<Watcher> toNotifyCreate = Sets.newHashSet();
+            toNotifyCreate.addAll(watchers.get(path));
+
+            final Set<Watcher> toNotifyParent = Sets.newHashSet();
+            final String parent = path.substring(0, path.lastIndexOf("/"));
+            if (!parent.isEmpty()) {
+                toNotifyParent.addAll(watchers.get(parent));
+            }
+
             if (getProgrammedFailStatus()) {
                 mutex.unlock();
                 cb.processResult(failReturnCode.intValue(), path, ctx, null);
@@ -215,6 +221,7 @@ public class MockZooKeeper extends ZooKeeper {
                 cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
             } else {
                 tree.put(path, Pair.of(data, 0));
+                watchers.removeAll(path);
                 mutex.unlock();
                 cb.processResult(0, path, ctx, null);
 
@@ -331,6 +338,12 @@ public class MockZooKeeper extends ZooKeeper {
                 return;
             }
 
+            if (!tree.containsKey(path)) {
+                mutex.unlock();
+                cb.processResult(KeeperException.Code.NoNode, path, ctx, null);
+                return;
+            }
+
             List<String> children = Lists.newArrayList();
             for (String item : tree.tailMap(path).keySet()) {
                 if (!item.startsWith(path)) {
@@ -347,12 +360,12 @@ public class MockZooKeeper extends ZooKeeper {
                 }
             }
 
-            mutex.unlock();
-
-            cb.processResult(0, path, ctx, children);
             if (watcher != null) {
                 watchers.put(path, watcher);
             }
+            mutex.unlock();
+
+            cb.processResult(0, path, ctx, children);
         });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 113c081..2818744 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -649,7 +649,7 @@ public class PulsarService implements AutoCloseable {
             List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
             long topicLoadStart = System.nanoTime();
 
-            for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) {
+            for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
                 try {
                     TopicName topicName = TopicName.get(topic);
                     if (bundle.includes(topicName)) {
@@ -974,7 +974,6 @@ public class PulsarService implements AutoCloseable {
         return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls;
     }
 
-
     private void startWorkerService(AuthenticationService authenticationService,
                                     AuthorizationService authorizationService)
             throws InterruptedException, IOException, KeeperException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ed497e8..92aaecd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -597,6 +597,7 @@ public abstract class AdminResource extends PulsarWebResource {
             try {
                 topicExist = pulsar.getNamespaceService()
                         .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                        .join()
                         .contains(topicName.toString());
             } catch (Exception e) {
                 log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5d6d8fc..02c3e46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -24,8 +24,6 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
-import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
-import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -189,7 +187,7 @@ public abstract class NamespacesBase extends AdminResource {
 
         boolean isEmpty;
         try {
-            isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty()
+            isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty()
                     && getPartitionedTopicList(TopicDomain.persistent).isEmpty()
                     && getPartitionedTopicList(TopicDomain.non_persistent).isEmpty();
         } catch (Exception e) {
@@ -319,7 +317,7 @@ public abstract class NamespacesBase extends AdminResource {
         NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
                 authoritative, true);
         try {
-            List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
+            List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
             for (String topic : topics) {
                 NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService()
                         .getBundle(TopicName.get(topic));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 509b41d..71c758b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -378,6 +378,7 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             boolean topicExist = pulsar().getNamespaceService()
                     .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .join()
                     .contains(topicName.toString());
             if (topicExist) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
@@ -1201,7 +1202,7 @@ public class PersistentTopicsBase extends AdminResource {
                         return;
                     }
                 }
-                
+
                 if (partitionException.get() != null) {
                     log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName,
                             subscriptionName, targetMessageId, partitionException.get());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index a47bc23..a0fbeca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -113,21 +113,25 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(hidden = true, value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
-    public List<String> getTopics(@PathParam("property") String property,
+    public void getTopics(@PathParam("property") String property,
                                   @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-                                  @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+                                  @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+                                  @Suspended AsyncResponse asyncResponse) {
         validateAdminAccessForTenant(property);
         validateNamespaceName(property, cluster, namespace);
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         getNamespacePolicies(namespaceName);
 
-        try {
-            return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
-        } catch (Exception e) {
-            log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e);
-            throw new RestException(e);
-        }
+        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+                .thenAccept(topics -> {
+                    asyncResponse.resume(topics);
+                })
+                .exceptionally(ex -> {
+                    log.error("Failed to get topics list for namespace {}", namespaceName, ex);
+                    asyncResponse.resume(ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 96bc084..7667167 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -129,6 +129,7 @@ public class NonPersistentTopics extends PersistentTopics {
         try {
             boolean topicExist = pulsar().getNamespaceService()
                     .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .join()
                     .contains(topicName.toString());
             if (topicExist) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 435892e..bd5ff0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pulsar.broker.admin.v2;
 
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -49,17 +54,12 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
-import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-
 @Path("/namespaces")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
@@ -80,21 +80,25 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
-    public List<String> getTopics(@PathParam("tenant") String tenant,
+    public void getTopics(@PathParam("tenant") String tenant,
                                   @PathParam("namespace") String namespace,
-                                  @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) {
+                                  @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode,
+                                  @Suspended AsyncResponse asyncResponse) {
         validateAdminAccessForTenant(tenant);
         validateNamespaceName(tenant, namespace);
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         getNamespacePolicies(namespaceName);
 
-        try {
-            return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode);
-        } catch (Exception e) {
-            log.error("Failed to get topics list for namespace {}", namespaceName, e);
-            throw new RestException(e);
-        }
+        pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+                .thenAccept(topics -> {
+                    asyncResponse.resume(topics);
+                })
+                .exceptionally(ex -> {
+                    log.error("Failed to get topics list for namespace {}", namespaceName, ex);
+                    asyncResponse.resume(ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 10dc5ee..9bc65e3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -174,6 +174,7 @@ public class NonPersistentTopics extends PersistentTopics {
         try {
             boolean topicExist = pulsar().getNamespaceService()
                     .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .join()
                     .contains(topicName.toString());
             if (topicExist) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5fbc20f..83e3ade 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.namespace;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 import io.netty.channel.EventLoopGroup;
+
+import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -54,7 +56,6 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
-import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.net.URL;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -863,14 +865,15 @@ public class NamespaceService {
         return getBundle(topicName);
     }
 
-    public List<String> getFullListOfTopics(NamespaceName namespaceName) throws Exception {
-        List<String> topics = getListOfPersistentTopics(namespaceName);
-        topics.addAll(getListOfNonPersistentTopics(namespaceName));
-        return topics;
+    public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) {
+        return getListOfPersistentTopics(namespaceName)
+                .thenCombine(getListOfNonPersistentTopics(namespaceName),
+                        (persistentTopics, nonPersistentTopics) -> {
+                            return ListUtils.union(persistentTopics, nonPersistentTopics);
+                        });
     }
 
-    public List<String> getListOfTopics(NamespaceName namespaceName, Mode mode)
-        throws Exception {
+    public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
         switch (mode) {
             case ALL:
                 return getFullListOfTopics(namespaceName);
@@ -882,30 +885,26 @@ public class NamespaceService {
         }
     }
 
-    public List<String> getListOfPersistentTopics(NamespaceName namespaceName) throws Exception {
-        List<String> topics = Lists.newArrayList();
-
+    public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
         // For every topic there will be a managed ledger created.
-        try {
-            String path = String.format("/managed-ledgers/%s/persistent", namespaceName);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Getting children from managed-ledgers now: {}", path);
-            }
-
-            for (String topic : pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) {
-                topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(topic)));
-            }
-        } catch (KeeperException.NoNodeException e) {
-            // NoNode means there are no persistent topics for this namespace
+        String path = String.format("/managed-ledgers/%s/persistent", namespaceName);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Getting children from managed-ledgers now: {}", path);
         }
 
-        topics.sort(null);
-        return topics;
-    }
+        return pulsar.getLocalZkCacheService().managedLedgerListCache().getAsync(path)
+                .thenApply(znodes -> {
+                    List<String> topics = Lists.newArrayList();
+                    for (String znode : znodes) {
+                        topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(znode)));
+                    }
 
-    public List<String> getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception {
-        List<String> topics = Lists.newArrayList();
+                    topics.sort(null);
+                    return topics;
+                });
+    }
 
+    public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
         ClusterData peerClusterData;
         try {
             peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
@@ -921,6 +920,7 @@ public class NamespaceService {
         }
 
         // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache.
+        List<String> topics = Lists.newArrayList();
         synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
             if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
                 pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
@@ -935,13 +935,13 @@ public class NamespaceService {
         }
 
         topics.sort(null);
-        return topics;
+        return CompletableFuture.completedFuture(topics);
     }
 
-    private List<String> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
-                                                               NamespaceName namespace) throws Exception {
+    private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
+                                                               NamespaceName namespace) {
         PulsarClientImpl client = getNamespaceClient(peerClusterData);
-        return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get();
+        return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT);
     }
 
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d91bd1a..1cab8e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1297,27 +1297,27 @@ public class ServerCnx extends PulsarHandler {
         final long requestId = commandGetTopicsOfNamespace.getRequestId();
         final String namespace = commandGetTopicsOfNamespace.getNamespace();
         final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
+        final NamespaceName namespaceName = NamespaceName.get(namespace);
 
-        try {
-            final NamespaceName namespaceName = NamespaceName.get(namespace);
-
-            final List<String> topics = getBrokerService().pulsar().getNamespaceService()
-                .getListOfTopics(namespaceName, mode);
+        getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
+                .thenAccept(topics -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
+                                remoteAddress, namespace, requestId, topics.size());
+                    }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
-                    remoteAddress, namespace, requestId, topics.size());
-            }
+                    ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
+                })
+                .exceptionally(ex -> {
+                    log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
+                            remoteAddress, namespace, requestId);
+                    ctx.writeAndFlush(
+                            Commands.newError(requestId,
+                                    BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
+                                    ex.getMessage()));
 
-            ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
-        } catch (Exception e) {
-            log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}",
-                remoteAddress, namespace, requestId);
-            ctx.writeAndFlush(
-                Commands.newError(requestId,
-                    BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)),
-                    e.getMessage()));
-        }
+                    return null;
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 1d33641..18cf12c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
@@ -725,7 +726,8 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
         List<String> topicNames = Lists.newArrayList(topicName2);
         NamespaceService nss = pulsar.getNamespaceService();
-        doReturn(topicNames).when(nss).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
+        doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
+                .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));
 
         // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
         log.debug("recheck topics change");
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index 8a7c3e7..671ef50 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -115,7 +115,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
 
                     //
                     dataCache.synchronous().invalidateAll();
-                    childrenCache.invalidateAll();
+                    childrenCache.synchronous().invalidateAll();
                     return;
                 default:
                     break;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 3045a0b..79b9738 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -51,7 +51,7 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
                 // in case of expired, the zkSession is no longer good
                 LOG.warn("Lost connection from local ZK. Invalidating the whole cache.");
                 dataCache.synchronous().invalidateAll();
-                childrenCache.invalidateAll();
+                childrenCache.synchronous().invalidateAll();
                 return;
             default:
                 break;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 6136481..eaab17b 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -22,17 +22,17 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Sets;
 
+import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Collections;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +41,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -77,8 +80,8 @@ public abstract class ZooKeeperCache implements Watcher {
     public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";
 
     protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
-    protected final Cache<String, Set<String>> childrenCache;
-    protected final Cache<String, Boolean> existsCache;
+    protected final AsyncLoadingCache<String, Set<String>> childrenCache;
+    protected final AsyncLoadingCache<String, Boolean> existsCache;
     private final OrderedExecutor executor;
     private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
     private boolean shouldShutdownExecutor;
@@ -96,8 +99,10 @@ public abstract class ZooKeeperCache implements Watcher {
         this.dataCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
                 .buildAsync((key, executor1) -> null);
 
-        this.childrenCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
-        this.existsCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
+        this.childrenCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
+                .buildAsync((key, executor1) -> null);
+        this.existsCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
+                .buildAsync((key, executor1) -> null);
     }
 
     public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
@@ -114,14 +119,14 @@ public abstract class ZooKeeperCache implements Watcher {
         final String path = event.getPath();
         if (path != null) {
             dataCache.synchronous().invalidate(path);
-            childrenCache.invalidate(path);
+            childrenCache.synchronous().invalidate(path);
             // sometimes zk triggers one watch per zk-session and if zkDataCache and ZkChildrenCache points to this
             // ZookeeperCache instance then ZkChildrenCache may not invalidate for it's parent. Therefore, invalidate
             // cache for parent if child is created/deleted
             if (event.getType().equals(EventType.NodeCreated) || event.getType().equals(EventType.NodeDeleted)) {
-                childrenCache.invalidate(Paths.get(path).getParent().toString());
+                childrenCache.synchronous().invalidate(Paths.get(path).getParent().toString());
             }
-            existsCache.invalidate(path);
+            existsCache.synchronous().invalidate(path);
             if (executor != null && updater != null) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater);
@@ -152,7 +157,7 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     private void invalidateAllExists() {
-        existsCache.invalidateAll();
+        existsCache.synchronous().invalidateAll();
     }
 
     public void invalidateAllData() {
@@ -160,7 +165,7 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public void invalidateAllChildren() {
-        childrenCache.invalidateAll();
+        childrenCache.synchronous().invalidateAll();
     }
 
     public void invalidateData(String path) {
@@ -168,11 +173,11 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public void invalidateChildren(String path) {
-        childrenCache.invalidate(path);
+        childrenCache.synchronous().invalidate(path);
     }
 
     private void invalidateExists(String path) {
-        existsCache.invalidate(path);
+        existsCache.synchronous().invalidate(path);
     }
 
     public void asyncInvalidate(String path) {
@@ -203,20 +208,30 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {
-        try {
-            return existsCache.get(path, () -> zkSession.get().exists(path, watcher) != null);
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            if (cause instanceof KeeperException) {
-                throw (KeeperException) cause;
-            } else if (cause instanceof InterruptedException) {
-                throw (InterruptedException) cause;
-            } else if (cause instanceof RuntimeException) {
-                throw (RuntimeException) cause;
-            } else {
-                throw new RuntimeException(cause);
+        return existsAsync(path, watcher).join();
+    }
+
+    @SuppressWarnings("deprecation")
+    public CompletableFuture<Boolean> existsAsync(String path, Watcher watcher) {
+        return existsCache.get(path, (p, executor) -> {
+            ZooKeeper zk = zkSession.get();
+            if (zk == null) {
+                return FutureUtil.failedFuture(new IOException("ZK session not ready"));
             }
-        }
+
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            zk.exists(path, watcher, (StatCallback) (rc, path1, ctx, stat) -> {
+                if (rc == Code.OK.intValue()) {
+                    future.complete(true);
+                } else if (rc == Code.NONODE.intValue()) {
+                    future.complete(false);
+                } else {
+                    future.completeExceptionally(KeeperException.create(rc));
+                }
+            }, null);
+
+            return future;
+        });
     }
 
     /**
@@ -365,7 +380,15 @@ public abstract class ZooKeeperCache implements Watcher {
      * @throws InterruptedException
      */
     public Set<String> getChildren(final String path) throws KeeperException, InterruptedException {
-        return getChildren(path, this);
+        try {
+            return getChildrenAsync(path, this).join();
+        } catch (CompletionException e) {
+            if (e.getCause() instanceof KeeperException) {
+                throw (KeeperException)e.getCause();
+            } else {
+                throw e;
+            }
+        }
     }
 
     /**
@@ -375,35 +398,50 @@ public abstract class ZooKeeperCache implements Watcher {
      * @param path
      * @param watcher
      * @return
-     * @throws KeeperException
-     * @throws InterruptedException
      */
-    public Set<String> getChildren(final String path, final Watcher watcher)
-            throws KeeperException, InterruptedException {
-        try {
-            return childrenCache.get(path, () -> {
-                LOG.debug("Fetching children at {}", path);
-                return Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher));
-            });
-        } catch (ExecutionException e) {
-            Throwable cause = e.getCause();
-            // The node we want may not exist yet, so put a watcher on its existance
-            // before throwing up the exception. Its possible that the node could have
-            // been created after the call to getChildren, but before the call to exists().
-            // If this is the case, exists will return true, and we just call getChildren again.
-            if (cause instanceof KeeperException.NoNodeException
-                    && exists(path, watcher)) {
-                return getChildren(path, watcher);
-            } else if (cause instanceof KeeperException) {
-                throw (KeeperException) cause;
-            } else if (cause instanceof InterruptedException) {
-                throw (InterruptedException) cause;
-            } else if (cause instanceof RuntimeException) {
-                throw (RuntimeException) cause;
-            } else {
-                throw new RuntimeException(cause);
-            }
-        }
+    @SuppressWarnings("deprecation")
+    public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watcher) {
+        return childrenCache.get(path, (p, executor) -> {
+            CompletableFuture<Set<String>> future = new CompletableFuture<>();
+            executor.execute(SafeRunnable.safeRun(() -> {
+                ZooKeeper zk = zkSession.get();
+                if (zk == null) {
+                    future.completeExceptionally(new IOException("ZK session not ready"));
+                    return;
+                }
+
+                zk.getChildren(path, watcher, (ChildrenCallback) (rc, path1, ctx, children) -> {
+                    if (rc == Code.OK.intValue()) {
+                        future.complete(Sets.newTreeSet(children));
+                    } else if (rc == Code.NONODE.intValue()) {
+                        // The node we want may not exist yet, so put a watcher on its existence
+                        // before throwing up the exception. Its possible that the node could have
+                        // been created after the call to getChildren, but before the call to exists().
+                        // If this is the case, exists will return true, and we just call getChildren again.
+                        existsAsync(path, watcher).thenAccept(exists -> {
+                            if (exists) {
+                                getChildrenAsync(path, watcher)
+                                        .thenAccept(c -> future.complete(c))
+                                        .exceptionally(ex -> {
+                                            future.completeExceptionally(ex);
+                                            return null;
+                                        });
+                            } else {
+                                // Z-node does not exist
+                                future.complete(Collections.emptySet());
+                            }
+                        }).exceptionally(ex -> {
+                            future.completeExceptionally(ex);
+                            return null;
+                        });
+                    } else {
+                        future.completeExceptionally(KeeperException.create(rc));
+                    }
+                }, null);
+            }));
+
+            return future;
+        });
     }
 
     @SuppressWarnings("unchecked")
@@ -412,7 +450,12 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public Set<String> getChildrenIfPresent(String path) {
-        return childrenCache.getIfPresent(path);
+        CompletableFuture<Set<String>> future = childrenCache.getIfPresent(path);
+        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
+            return future.getNow(null);
+        } else {
+            return null;
+        }
     }
 
     @Override
@@ -422,9 +465,9 @@ public abstract class ZooKeeperCache implements Watcher {
     }
 
     public void invalidateRoot(String root) {
-        for (String key : childrenCache.asMap().keySet()) {
+        for (String key : childrenCache.synchronous().asMap().keySet()) {
             if (key.startsWith(root)) {
-                childrenCache.invalidate(key);
+                childrenCache.synchronous().invalidate(key);
             }
         }
     }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
index cc8352e..0bb6f46 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.zookeeper;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
@@ -49,12 +51,24 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
     }
 
     public Set<String> get() throws KeeperException, InterruptedException {
-        return cache.getChildren(path, this);
+        return get(this.path);
     }
 
     public Set<String> get(String path) throws KeeperException, InterruptedException {
-        LOG.debug("getChildren called at: {}", path);
-        return cache.getChildren(path, this);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getChildren called at: {}", path);
+        }
+
+        Set<String> children = cache.getChildrenAsync(path, this).join();
+        if (children == null) {
+            throw KeeperException.create(KeeperException.Code.NONODE);
+        }
+
+        return children;
+    }
+
+    public CompletableFuture<Set<String>> getAsync(String path) {
+        return cache.getChildrenAsync(path, this);
     }
 
     public void clear() {
@@ -67,18 +81,17 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
 
     @Override
     public void reloadCache(final String path) {
-        try {
-            cache.invalidate(path);
-            Set<String> children = cache.getChildren(path, this);
-            LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path);
-            for (ZooKeeperCacheListener<Set<String>> listener : listeners) {
-                listener.onUpdate(path, children, null);
-            }
-        } catch (KeeperException.NoNodeException nne) {
-            LOG.debug("Node [{}] does not exist", nne.getPath());
-        } catch (Exception e) {
-            LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path);
-        }
+        cache.invalidate(path);
+        cache.getChildrenAsync(path, this)
+                .thenAccept(children -> {
+                    LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path);
+                    for (ZooKeeperCacheListener<Set<String>> listener : listeners) {
+                        listener.onUpdate(path, children, null);
+                    }
+                }).exceptionally(ex -> {
+                    LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path, ex);
+                    return null;
+                }).join();
     }
 
     @Override
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index e25c1ad..834844a 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -25,6 +25,13 @@ import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import java.util.Collections;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -36,7 +43,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
@@ -50,12 +56,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 @Test
 public class ZookeeperCacheTest {
     private MockZooKeeper zkClient;
@@ -194,12 +194,7 @@ public class ZookeeperCacheTest {
         cache.unregisterListener(counter);
 
         assertEquals(notificationCount.get(), 0);
-        try {
-            cache.get();
-            fail("Expect this to fail");
-        } catch (KeeperException.NoNodeException nne) {
-            // correct
-        }
+        assertEquals(cache.get(), Collections.emptySet());
 
         zkClient.create("/test", new byte[0], null, null);
         zkClient.create("/test/z1", new byte[0], null, null);
@@ -231,8 +226,6 @@ public class ZookeeperCacheTest {
         } catch (Exception e) {
             // Ok
         }
-
-        assertEquals(notificationCount.get(), (recvNotifications + 1));
     }
 
     @Test(timeOut = 10000)