You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/19 18:05:31 UTC

[pulsar] branch master updated: [pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a057a14  [pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633)
a057a14 is described below

commit a057a1430a186b6c874b9605a0c525daf3846900
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 19 11:05:27 2019 -0700

    [pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633)
    
    fix: compilation
    
    fix test
    
    fix test
---
 conf/broker.conf                                    |  3 +++
 conf/standalone.conf                                |  3 +++
 .../apache/pulsar/broker/ServiceConfiguration.java  |  5 +++++
 .../broker/authorization/AuthorizationService.java  | 17 ++++++++++-------
 .../pulsar/broker/BookKeeperClientFactoryImpl.java  |  4 ++--
 .../org/apache/pulsar/broker/PulsarService.java     |  6 ++++--
 .../pulsar/broker/namespace/NamespaceService.java   | 17 ++++++++++-------
 .../service/persistent/DispatchRateLimiter.java     |  3 +--
 .../service/persistent/SubscribeRateLimiter.java    |  1 -
 .../apache/pulsar/broker/web/PulsarWebResource.java |  6 +++---
 .../pulsar/broker/cache/ResourceQuotaCacheTest.java |  2 +-
 .../pulsar/broker/namespace/OwnershipCacheTest.java |  2 +-
 .../PersistentDispatcherFailoverConsumerTest.java   |  5 +++++
 .../pulsar/broker/service/PersistentTopicTest.java  |  5 +++++
 .../apache/pulsar/broker/service/ServerCnxTest.java |  4 ++++
 .../discovery/service/BrokerDiscoveryProvider.java  |  2 ++
 .../discovery/service/web/ZookeeperCacheLoader.java |  4 +++-
 .../proxy/server/BrokerDiscoveryProvider.java       |  2 ++
 .../proxy/server/util/ZookeeperCacheLoader.java     |  4 +++-
 .../apache/pulsar/websocket/WebSocketService.java   |  5 +++--
 .../pulsar/zookeeper/GlobalZooKeeperCache.java      |  5 +++--
 .../pulsar/zookeeper/LocalZooKeeperCache.java       |  4 ++--
 .../zookeeper/ZkBookieRackAffinityMapping.java      |  3 ++-
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java    |  3 ++-
 .../org/apache/pulsar/zookeeper/ZooKeeperCache.java | 21 +++++++++++++--------
 .../apache/pulsar/zookeeper/ZooKeeperDataCache.java |  7 ++++---
 .../zookeeper/ZkBookieRackAffinityMappingTest.java  |  6 +++---
 ...ZkIsolatedBookieEnsemblePlacementPolicyTest.java |  6 +++---
 .../apache/pulsar/zookeeper/ZookeeperCacheTest.java | 16 ++++++++--------
 29 files changed, 110 insertions(+), 61 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index fb5e1e8..0d576fa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -61,6 +61,9 @@ failureDomainsEnabled=false
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds=30
+
 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
 brokerShutdownTimeoutMs=60000
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 41708c7..22bbc70 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,9 @@ failureDomainsEnabled=false
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
 
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds=30
+
 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
 brokerShutdownTimeoutMs=60000
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index da2b098..4d6b498 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -184,6 +184,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private long zooKeeperSessionTimeoutMillis = 30000;
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "ZooKeeper operation timeout in seconds"
+        )
+    private int zooKeeperOperationTimeoutSeconds = 30;
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 42d935c..de6f799 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 /**
  * Authorization service that manages pluggable authorization provider and authorize requests accordingly.
@@ -215,9 +214,11 @@ public class AuthorizationService {
     public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData)
             throws Exception {
         try {
-            return canProduceAsync(topicName, role, authenticationData).get(cacheTimeOutInSec, SECONDS);
+            return canProduceAsync(topicName, role, authenticationData).get(conf.getZooKeeperOperationTimeoutSeconds(),
+                    SECONDS);
         } catch (InterruptedException e) {
-            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
+            log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
+                    topicName);
             throw e;
         } catch (Exception e) {
             log.warn("Producer-client  with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
@@ -229,13 +230,15 @@ public class AuthorizationService {
     public boolean canConsume(TopicName topicName, String role, AuthenticationDataSource authenticationData,
             String subscription) throws Exception {
         try {
-            return canConsumeAsync(topicName, role, authenticationData, subscription).get(cacheTimeOutInSec, SECONDS);
+            return canConsumeAsync(topicName, role, authenticationData, subscription)
+                    .get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException e) {
-            log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
+            log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
+                    topicName);
             throw e;
         } catch (Exception e) {
-            log.warn("Consumer-client  with Role - {} failed to get permissions for topic - {}. {}", role,
-                    topicName, e.getMessage());
+            log.warn("Consumer-client  with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+                    e.getMessage());
             throw e;
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 5e6e59d..7f13912 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -76,7 +76,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
                     ZkBookieRackAffinityMapping.class.getName());
 
-            ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
+            ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
             };
             if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
                 zkc.stop();
@@ -91,7 +91,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
             bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
                     conf.getBookkeeperClientIsolationGroups());
             if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
-                ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
+                ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
                 };
 
                 if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 54d8092..ed4d928 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -538,9 +538,11 @@ public class PulsarService implements AutoCloseable {
 
         LOG.info("starting configuration cache service");
 
-        this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
+        this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(),
+                getOrderedExecutor());
         this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
-                (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
+                (int) config.getZooKeeperSessionTimeoutMillis(), 
+                config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
                 getOrderedExecutor(), this.cacheExecutor);
         try {
             this.globalZkCache.start();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dbdf16e..928407b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -92,7 +92,6 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
 import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 /**
  * The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
@@ -762,7 +761,8 @@ public class NamespaceService {
 
         if (!policies.isPresent()) {
             // if policies is not present into localZk then create new policies
-            this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
+            this.pulsar.getLocalZkCacheService().createPolicies(path, false)
+                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         }
 
         long version = nsBundles.getVersion();
@@ -828,17 +828,20 @@ public class NamespaceService {
     }
 
     public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
-        ownershipCache.removeOwnership(getFullBundle(nsName)).get(cacheTimeOutInSec, SECONDS);
+        ownershipCache.removeOwnership(getFullBundle(nsName))
+                .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         bundleFactory.invalidateBundleCache(nsName);
     }
 
     public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
-        ownershipCache.removeOwnership(nsBundle).get(cacheTimeOutInSec, SECONDS);
+        ownershipCache.removeOwnership(nsBundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(),
+                SECONDS);
         bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
     }
 
     public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
-        ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(cacheTimeOutInSec, SECONDS);
+        ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData))
+                .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         bundleFactory.invalidateBundleCache(nsName);
     }
 
@@ -896,7 +899,7 @@ public class NamespaceService {
         ClusterData peerClusterData;
         try {
             peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
-                .get(cacheTimeOutInSec, SECONDS);
+                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new RuntimeException("Failed to contact peer replication cluster.", e);
         }
@@ -972,7 +975,7 @@ public class NamespaceService {
 
     public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
         // if there is no znode for the service unit, it is not owned by any broker
-        return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS);
+        return getOwnerAsync(bundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
     }
 
     public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index a69c31f..7e9052a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -189,7 +188,7 @@ public class DispatchRateLimiter {
         Optional<Policies> policies = Optional.empty();
         try {
             policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
-                    .get(cacheTimeOutInSec, SECONDS);
+                    .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         } catch (Exception e) {
             log.warn("Failed to get message-rate for {} ", topicName, e);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 078abb5..2cc528f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 public class SubscribeRateLimiter {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 824be7a..5b884b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
 
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -612,9 +611,10 @@ public abstract class PulsarWebResource {
      * @throws Exception
      */
     protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
+        int timeout = pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
         try {
             ClusterData peerClusterData = checkLocalOrGetPeerReplicationCluster(pulsar(), namespace)
-                    .get(cacheTimeOutInSec, SECONDS);
+                    .get(timeout, SECONDS);
             // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
             // redirect to the peer-cluster
             if (peerClusterData != null) {
@@ -627,7 +627,7 @@ public abstract class PulsarWebResource {
                 throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
             }
         } catch (InterruptedException e) {
-            log.warn("Time-out {} sec while validating policy on {} ", cacheTimeOutInSec, namespace);
+            log.warn("Time-out {} sec while validating policy on {} ", timeout, namespace);
             throw new RestException(Status.SERVICE_UNAVAILABLE, String.format(
                     "Failed to validate global cluster configuration : ns=%s  emsg=%s", namespace, e.getMessage()));
         } catch (WebApplicationException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 2fe63e8..d07fae3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -54,7 +54,7 @@ public class ResourceQuotaCacheTest {
     public void setup() throws Exception {
         pulsar = mock(PulsarService.class);
         executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
         localCache = new LocalZooKeeperCacheService(zkCache, null);
 
         // set mock pulsar localzkcache
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 1c11f00..65991ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -77,7 +77,7 @@ public class OwnershipCacheTest {
         pulsar = mock(PulsarService.class);
         config = mock(ServiceConfiguration.class);
         executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
-        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
+        zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
         localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
         ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
         when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 9db657a..24cb812 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
@@ -116,6 +117,10 @@ public class PersistentDispatcherFailoverConsumerTest {
         doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
             .when(pulsar).getBookKeeperClient();
 
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        doReturn(cache).when(pulsar).getLocalZkCache();
+
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9895e7b..8b3c721 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -103,6 +103,7 @@ import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentCaptor;
@@ -155,6 +156,10 @@ public class PersistentTopicTest {
         doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
             .when(pulsar).getBookKeeperClient();
 
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        doReturn(cache).when(pulsar).getLocalZkCache();
+
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b98cd72..1fe2f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -106,6 +106,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
 import org.mockito.Mockito;
@@ -157,6 +158,9 @@ public class ServerCnxTest {
 
         mlFactoryMock = mock(ManagedLedgerFactory.class);
         doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        doReturn(cache).when(pulsar).getLocalZkCache();
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 54463aa..8b3a012 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -72,6 +73,7 @@ public class BrokerDiscoveryProvider implements Closeable {
             localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
                     config.getZookeeperSessionTimeoutMs());
             globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
+                    (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
                     config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
             globalZkCache.start();
         } catch (Exception e) {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 1371d05..abdfd4d 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -68,7 +69,8 @@ public class ZookeeperCacheLoader implements Closeable {
             log.error("Shutting down ZK sessions: {}", exitCode);
         });
 
-        this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
+        this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
+                (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
         localZkConnectionSvc.start(exitCode -> {
             try {
                 localZkCache.getZooKeeper().close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 60362f1..28f28cd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -71,6 +72,7 @@ public class BrokerDiscoveryProvider implements Closeable {
             localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
                     config.getZookeeperSessionTimeoutMs());
             globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
+                    (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
                     config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
             globalZkCache.start();
         } catch (Exception e) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 887c5ea..c3afdec 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -65,7 +66,8 @@ public class ZookeeperCacheLoader implements Closeable {
      */
     public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
         this.zkClient = factory.create(zookeeperServers, SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
-        this.localZkCache = new LocalZooKeeperCache(zkClient, this.orderedExecutor);
+        this.localZkCache = new LocalZooKeeperCache(zkClient,
+                (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
 
         this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
             @Override
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index edc0c1e..d91cb0f 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -101,8 +101,9 @@ public class WebSocketService implements Closeable {
 
         if (isNotBlank(config.getConfigurationStoreServers())) {
             this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
-                    (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
-                    this.orderedExecutor, this.executor);
+                    (int) config.getZooKeeperSessionTimeoutMillis(),
+                    (int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
+                    config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
             try {
                 this.globalZkCache.start();
             } catch (IOException e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index deb74ac..8a7c3e7 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -51,8 +51,9 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
     private final ScheduledExecutorService scheduledExecutor;
 
     public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
-            String globalZkConnect, OrderedExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) {
-        super(null, orderedExecutor);
+            int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
+            ScheduledExecutorService scheduledExecutor) {
+        super(null, zkOperationTimeoutSeconds, orderedExecutor);
         this.zlClientFactory = zkClientFactory;
         this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
         this.globalZkConnect = globalZkConnect;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 945b58e..3045a0b 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -35,8 +35,8 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
 
     private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class);
 
-    public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor executor) {
-        super(zk, executor);
+    public LocalZooKeeperCache(final ZooKeeper zk, int zkOperationTimeoutSeconds, final OrderedExecutor executor) {
+        super(zk, zkOperationTimeoutSeconds, executor);
     }
 
     @Override
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 0b40157..010d45a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.RackChangeNotifier;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -102,7 +103,7 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                 try {
                     ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
                             .sessionTimeoutMs(zkTimeout).build();
-                    zkCache = new ZooKeeperCache(zkClient) {
+                    zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
                     };
                     conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
                 } catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 7d9b53e..d55366a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -95,7 +96,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
                 try {
                     ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
                             .sessionTimeoutMs(zkTimeout).build();
-                    zkCache = new ZooKeeperCache(zkClient) {
+                    zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
                     };
                     conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
                 } catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 0e69c88..7325e6c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -83,12 +82,13 @@ public abstract class ZooKeeperCache implements Watcher {
     private final OrderedExecutor executor;
     private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
     private boolean shouldShutdownExecutor;
-    public static final int cacheTimeOutInSec = 30;
+    private final int zkOperationTimeoutSeconds;
 
     protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
 
-    public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
+    public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
         checkNotNull(executor);
+        this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
         this.executor = executor;
         this.zkSession.set(zkSession);
         this.shouldShutdownExecutor = false;
@@ -100,8 +100,9 @@ public abstract class ZooKeeperCache implements Watcher {
         this.existsCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
     }
 
-    public ZooKeeperCache(ZooKeeper zkSession) {
-        this(zkSession, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
+    public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
+        this(zkSession, zkOperationTimeoutSeconds,
+                OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
         this.shouldShutdownExecutor = true;
     }
 
@@ -178,6 +179,10 @@ public abstract class ZooKeeperCache implements Watcher {
         backgroundExecutor.execute(() -> invalidate(path));
     }
 
+    public int getZkOperationTimeoutSeconds() {
+        return zkOperationTimeoutSeconds;
+    }
+
     public void invalidate(final String path) {
         invalidateData(path);
         invalidateChildren(path);
@@ -285,14 +290,14 @@ public abstract class ZooKeeperCache implements Watcher {
     public <T> Optional<Entry<T, Stat>> getData(final String path, final Watcher watcher,
             final Deserializer<T> deserializer) throws Exception {
         try {
-            return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS);
+            return getDataAsync(path, watcher, deserializer).get(this.zkOperationTimeoutSeconds, TimeUnit.SECONDS);
         } catch (ExecutionException e) {
             asyncInvalidate(path);
             Throwable cause = e.getCause();
             if (cause instanceof KeeperException) {
                 throw (KeeperException) cause;
             } else if (cause instanceof InterruptedException) {
-                LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
+                LOG.warn("Time-out while fetching {} zk-data in {} sec", path, this.zkOperationTimeoutSeconds);
                 throw (InterruptedException) cause;
             } else if (cause instanceof RuntimeException) {
                 throw (RuntimeException) cause;
@@ -300,7 +305,7 @@ public abstract class ZooKeeperCache implements Watcher {
                 throw new RuntimeException(cause);
             }
         } catch (TimeoutException e) {
-            LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
+            LOG.warn("Time-out while fetching {} zk-data in {} sec", path, this.zkOperationTimeoutSeconds);
             asyncInvalidate(path);
             throw e;
         }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index defed3a..8ff2fc3 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -22,14 +22,13 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +47,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
 
     private final ZooKeeperCache cache;
     private final List<ZooKeeperCacheListener<T>> listeners = Lists.newCopyOnWriteArrayList();
+    private final int zkOperationTimeoutSeconds;
 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -58,6 +58,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
 
     public ZooKeeperDataCache(final ZooKeeperCache cache) {
         this.cache = cache;
+        this.zkOperationTimeoutSeconds = cache.getZkOperationTimeoutSeconds();
     }
 
     public CompletableFuture<Optional<T>> getAsync(String path) {
@@ -91,7 +92,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
      * @throws Exception
      */
     public Optional<T> get(final String path) throws Exception {
-        return getAsync(path).get();
+        return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
     }
 
     public Optional<Entry<T, Stat>> getWithStat(final String path) throws Exception {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
index 60e7c06..88034c7 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
@@ -79,7 +79,7 @@ public class ZkBookieRackAffinityMappingTest {
         // Case1: ZKCache is given
         ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
         ClientConfiguration bkClientConf1 = new ClientConfiguration();
-        bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         mapping1.setConf(bkClientConf1);
         List<String> racks1 = mapping1
@@ -108,7 +108,7 @@ public class ZkBookieRackAffinityMappingTest {
     public void testNoBookieInfo() throws Exception {
         ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
         ClientConfiguration bkClientConf = new ClientConfiguration();
-        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         mapping.setConf(bkClientConf);
         List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
@@ -152,7 +152,7 @@ public class ZkBookieRackAffinityMappingTest {
 
         ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
         ClientConfiguration bkClientConf = new ClientConfiguration();
-        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         mapping.setConf(bkClientConf);
         List<String> racks = mapping
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index 3a807e8..7ed186d 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -113,7 +113,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
         ClientConfiguration bkClientConf = new ClientConfiguration();
-        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
@@ -182,7 +182,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
     public void testNoBookieInfo() throws Exception {
         ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
         ClientConfiguration bkClientConf = new ClientConfiguration();
-        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
@@ -291,7 +291,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
 
         ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
         ClientConfiguration bkClientConf = new ClientConfiguration();
-        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+        bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
         });
         isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
                 NullStatsLogger.INSTANCE);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 39f23ac..e25c1ad 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -88,7 +88,7 @@ public class ZookeeperCacheTest {
     @Test(timeOut = 10000)
     void testSimpleCache() throws Exception {
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws Exception {
@@ -128,7 +128,7 @@ public class ZookeeperCacheTest {
     void testChildrenCache() throws Exception {
         zkClient.create("/test", new byte[0], null, null);
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
 
         // Create callback counter
@@ -179,7 +179,7 @@ public class ZookeeperCacheTest {
     @Test(timeOut = 10000)
     void testChildrenCacheZnodeCreatedAfterCache() throws Exception {
 
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
 
         // Create callback counter
@@ -240,7 +240,7 @@ public class ZookeeperCacheTest {
         // Check existence after creation of the node
         zkClient.create("/test", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -257,7 +257,7 @@ public class ZookeeperCacheTest {
         zkClient.create("/test/c1", new byte[0], null, null);
         zkClient.create("/test/c2", new byte[0], null, null);
         Thread.sleep(20);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         boolean exists = zkCacheService.exists("/test");
         Assert.assertTrue(exists, "/test should exists in the cache");
 
@@ -295,7 +295,7 @@ public class ZookeeperCacheTest {
             }
         };
 
-        GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, "", executor,
+        GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
                 scheduledExecutor);
         zkCacheService.start();
         zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
@@ -399,7 +399,7 @@ public class ZookeeperCacheTest {
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
         ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
             @Override
             public String deserialize(String key, byte[] content) throws Exception {
@@ -447,7 +447,7 @@ public class ZookeeperCacheTest {
         // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
         // callback-result process
         MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
-        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
 
         final AtomicInteger count = new AtomicInteger(0);
         ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {