You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/19 18:05:31 UTC
[pulsar] branch master updated: [pulsar-broker] Fix deadlock: add
zk-operation timeout for blocking call on zk-cache (#3633)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a057a14 [pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633)
a057a14 is described below
commit a057a1430a186b6c874b9605a0c525daf3846900
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Mar 19 11:05:27 2019 -0700
[pulsar-broker] Fix deadlock: add zk-operation timeout for blocking call on zk-cache (#3633)
fix: compilation
fix test
fix test
---
conf/broker.conf | 3 +++
conf/standalone.conf | 3 +++
.../apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../broker/authorization/AuthorizationService.java | 17 ++++++++++-------
.../pulsar/broker/BookKeeperClientFactoryImpl.java | 4 ++--
.../org/apache/pulsar/broker/PulsarService.java | 6 ++++--
.../pulsar/broker/namespace/NamespaceService.java | 17 ++++++++++-------
.../service/persistent/DispatchRateLimiter.java | 3 +--
.../service/persistent/SubscribeRateLimiter.java | 1 -
.../apache/pulsar/broker/web/PulsarWebResource.java | 6 +++---
.../pulsar/broker/cache/ResourceQuotaCacheTest.java | 2 +-
.../pulsar/broker/namespace/OwnershipCacheTest.java | 2 +-
.../PersistentDispatcherFailoverConsumerTest.java | 5 +++++
.../pulsar/broker/service/PersistentTopicTest.java | 5 +++++
.../apache/pulsar/broker/service/ServerCnxTest.java | 4 ++++
.../discovery/service/BrokerDiscoveryProvider.java | 2 ++
.../discovery/service/web/ZookeeperCacheLoader.java | 4 +++-
.../proxy/server/BrokerDiscoveryProvider.java | 2 ++
.../proxy/server/util/ZookeeperCacheLoader.java | 4 +++-
.../apache/pulsar/websocket/WebSocketService.java | 5 +++--
.../pulsar/zookeeper/GlobalZooKeeperCache.java | 5 +++--
.../pulsar/zookeeper/LocalZooKeeperCache.java | 4 ++--
.../zookeeper/ZkBookieRackAffinityMapping.java | 3 ++-
.../ZkIsolatedBookieEnsemblePlacementPolicy.java | 3 ++-
.../org/apache/pulsar/zookeeper/ZooKeeperCache.java | 21 +++++++++++++--------
.../apache/pulsar/zookeeper/ZooKeeperDataCache.java | 7 ++++---
.../zookeeper/ZkBookieRackAffinityMappingTest.java | 6 +++---
...ZkIsolatedBookieEnsemblePlacementPolicyTest.java | 6 +++---
.../apache/pulsar/zookeeper/ZookeeperCacheTest.java | 16 ++++++++--------
29 files changed, 110 insertions(+), 61 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index fb5e1e8..0d576fa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -61,6 +61,9 @@ failureDomainsEnabled=false
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds=30
+
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 41708c7..22bbc70 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -51,6 +51,9 @@ failureDomainsEnabled=false
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds=30
+
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index da2b098..4d6b498 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -184,6 +184,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long zooKeeperSessionTimeoutMillis = 30000;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "ZooKeeper operation timeout in seconds"
+ )
+ private int zooKeeperOperationTimeoutSeconds = 30;
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 42d935c..de6f799 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
/**
* Authorization service that manages pluggable authorization provider and authorize requests accordingly.
@@ -215,9 +214,11 @@ public class AuthorizationService {
public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData)
throws Exception {
try {
- return canProduceAsync(topicName, role, authenticationData).get(cacheTimeOutInSec, SECONDS);
+ return canProduceAsync(topicName, role, authenticationData).get(conf.getZooKeeperOperationTimeoutSeconds(),
+ SECONDS);
} catch (InterruptedException e) {
- log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
+ log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
+ topicName);
throw e;
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
@@ -229,13 +230,15 @@ public class AuthorizationService {
public boolean canConsume(TopicName topicName, String role, AuthenticationDataSource authenticationData,
String subscription) throws Exception {
try {
- return canConsumeAsync(topicName, role, authenticationData, subscription).get(cacheTimeOutInSec, SECONDS);
+ return canConsumeAsync(topicName, role, authenticationData, subscription)
+ .get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
- log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
+ log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
+ topicName);
throw e;
} catch (Exception e) {
- log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", role,
- topicName, e.getMessage());
+ log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
+ e.getMessage());
throw e;
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
index 5e6e59d..7f13912 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
@@ -76,7 +76,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
- ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
zkc.stop();
@@ -91,7 +91,7 @@ public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
- ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
+ ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 54d8092..ed4d928 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -538,9 +538,11 @@ public class PulsarService implements AutoCloseable {
LOG.info("starting configuration cache service");
- this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
+ this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(),
+ getOrderedExecutor());
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
- (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
+ (int) config.getZooKeeperSessionTimeoutMillis(),
+ config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
try {
this.globalZkCache.start();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index dbdf16e..928407b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -92,7 +92,6 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
@@ -762,7 +761,8 @@ public class NamespaceService {
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
- this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
+ this.pulsar.getLocalZkCacheService().createPolicies(path, false)
+ .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
long version = nsBundles.getVersion();
@@ -828,17 +828,20 @@ public class NamespaceService {
}
public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
- ownershipCache.removeOwnership(getFullBundle(nsName)).get(cacheTimeOutInSec, SECONDS);
+ ownershipCache.removeOwnership(getFullBundle(nsName))
+ .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}
public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
- ownershipCache.removeOwnership(nsBundle).get(cacheTimeOutInSec, SECONDS);
+ ownershipCache.removeOwnership(nsBundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(),
+ SECONDS);
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
}
public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
- ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(cacheTimeOutInSec, SECONDS);
+ ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData))
+ .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}
@@ -896,7 +899,7 @@ public class NamespaceService {
ClusterData peerClusterData;
try {
peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
- .get(cacheTimeOutInSec, SECONDS);
+ .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to contact peer replication cluster.", e);
}
@@ -972,7 +975,7 @@ public class NamespaceService {
public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
// if there is no znode for the service unit, it is not owned by any broker
- return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS);
+ return getOwnerAsync(bundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index a69c31f..7e9052a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service.persistent;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -189,7 +188,7 @@ public class DispatchRateLimiter {
Optional<Policies> policies = Optional.empty();
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
- .get(cacheTimeOutInSec, SECONDS);
+ .get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 078abb5..2cc528f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
public class SubscribeRateLimiter {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 824be7a..5b884b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;
import java.net.MalformedURLException;
import java.net.URI;
@@ -612,9 +611,10 @@ public abstract class PulsarWebResource {
* @throws Exception
*/
protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
+ int timeout = pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
try {
ClusterData peerClusterData = checkLocalOrGetPeerReplicationCluster(pulsar(), namespace)
- .get(cacheTimeOutInSec, SECONDS);
+ .get(timeout, SECONDS);
// if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
// redirect to the peer-cluster
if (peerClusterData != null) {
@@ -627,7 +627,7 @@ public abstract class PulsarWebResource {
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
} catch (InterruptedException e) {
- log.warn("Time-out {} sec while validating policy on {} ", cacheTimeOutInSec, namespace);
+ log.warn("Time-out {} sec while validating policy on {} ", timeout, namespace);
throw new RestException(Status.SERVICE_UNAVAILABLE, String.format(
"Failed to validate global cluster configuration : ns=%s emsg=%s", namespace, e.getMessage()));
} catch (WebApplicationException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
index 2fe63e8..d07fae3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java
@@ -54,7 +54,7 @@ public class ResourceQuotaCacheTest {
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
// set mock pulsar localzkcache
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 1c11f00..65991ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -77,7 +77,7 @@ public class OwnershipCacheTest {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
- zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
+ zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 9db657a..24cb812 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
@@ -116,6 +117,10 @@ public class PersistentDispatcherFailoverConsumerTest {
doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
.when(pulsar).getBookKeeperClient();
+ ZooKeeperCache cache = mock(ZooKeeperCache.class);
+ doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+ doReturn(cache).when(pulsar).getLocalZkCache();
+
configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9895e7b..8b3c721 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -103,6 +103,7 @@ import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
@@ -155,6 +156,10 @@ public class PersistentTopicTest {
doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
.when(pulsar).getBookKeeperClient();
+ ZooKeeperCache cache = mock(ZooKeeperCache.class);
+ doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+ doReturn(cache).when(pulsar).getLocalZkCache();
+
configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b98cd72..1fe2f97 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -106,6 +106,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
@@ -157,6 +158,9 @@ public class ServerCnxTest {
mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+ ZooKeeperCache cache = mock(ZooKeeperCache.class);
+ doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+ doReturn(cache).when(pulsar).getLocalZkCache();
ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 54463aa..8b3a012 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -72,6 +73,7 @@ public class BrokerDiscoveryProvider implements Closeable {
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
+ (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
index 1371d05..abdfd4d 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -68,7 +69,8 @@ public class ZookeeperCacheLoader implements Closeable {
log.error("Shutting down ZK sessions: {}", exitCode);
});
- this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
+ this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
+ (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 60362f1..28f28cd 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -71,6 +72,7 @@ public class BrokerDiscoveryProvider implements Closeable {
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
+ (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 887c5ea..c3afdec 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -65,7 +66,8 @@ public class ZookeeperCacheLoader implements Closeable {
*/
public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
this.zkClient = factory.create(zookeeperServers, SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
- this.localZkCache = new LocalZooKeeperCache(zkClient, this.orderedExecutor);
+ this.localZkCache = new LocalZooKeeperCache(zkClient,
+ (int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
@Override
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index edc0c1e..d91cb0f 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -101,8 +101,9 @@ public class WebSocketService implements Closeable {
if (isNotBlank(config.getConfigurationStoreServers())) {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
- (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
- this.orderedExecutor, this.executor);
+ (int) config.getZooKeeperSessionTimeoutMillis(),
+ (int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
+ config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index deb74ac..8a7c3e7 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -51,8 +51,9 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
private final ScheduledExecutorService scheduledExecutor;
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
- String globalZkConnect, OrderedExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) {
- super(null, orderedExecutor);
+ int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
+ ScheduledExecutorService scheduledExecutor) {
+ super(null, zkOperationTimeoutSeconds, orderedExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
index 945b58e..3045a0b 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java
@@ -35,8 +35,8 @@ public class LocalZooKeeperCache extends ZooKeeperCache {
private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class);
- public LocalZooKeeperCache(final ZooKeeper zk, final OrderedExecutor executor) {
- super(zk, executor);
+ public LocalZooKeeperCache(final ZooKeeper zk, int zkOperationTimeoutSeconds, final OrderedExecutor executor) {
+ super(zk, zkOperationTimeoutSeconds, executor);
}
@Override
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 0b40157..010d45a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
@@ -102,7 +103,7 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
try {
ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(zkTimeout).build();
- zkCache = new ZooKeeperCache(zkClient) {
+ zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
};
conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
} catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index 7d9b53e..d55366a 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -95,7 +96,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
try {
ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(zkTimeout).build();
- zkCache = new ZooKeeperCache(zkClient) {
+ zkCache = new ZooKeeperCache(zkClient, (int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
};
conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
} catch (Exception e) {
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index 0e69c88..7325e6c 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -34,7 +34,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -83,12 +82,13 @@ public abstract class ZooKeeperCache implements Watcher {
private final OrderedExecutor executor;
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
- public static final int cacheTimeOutInSec = 30;
+ private final int zkOperationTimeoutSeconds;
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
- public ZooKeeperCache(ZooKeeper zkSession, OrderedExecutor executor) {
+ public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
checkNotNull(executor);
+ this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
this.executor = executor;
this.zkSession.set(zkSession);
this.shouldShutdownExecutor = false;
@@ -100,8 +100,9 @@ public abstract class ZooKeeperCache implements Watcher {
this.existsCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
}
- public ZooKeeperCache(ZooKeeper zkSession) {
- this(zkSession, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
+ public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) {
+ this(zkSession, zkOperationTimeoutSeconds,
+ OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
this.shouldShutdownExecutor = true;
}
@@ -178,6 +179,10 @@ public abstract class ZooKeeperCache implements Watcher {
backgroundExecutor.execute(() -> invalidate(path));
}
+ public int getZkOperationTimeoutSeconds() {
+ return zkOperationTimeoutSeconds;
+ }
+
public void invalidate(final String path) {
invalidateData(path);
invalidateChildren(path);
@@ -285,14 +290,14 @@ public abstract class ZooKeeperCache implements Watcher {
public <T> Optional<Entry<T, Stat>> getData(final String path, final Watcher watcher,
final Deserializer<T> deserializer) throws Exception {
try {
- return getDataAsync(path, watcher, deserializer).get(cacheTimeOutInSec, TimeUnit.SECONDS);
+ return getDataAsync(path, watcher, deserializer).get(this.zkOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (ExecutionException e) {
asyncInvalidate(path);
Throwable cause = e.getCause();
if (cause instanceof KeeperException) {
throw (KeeperException) cause;
} else if (cause instanceof InterruptedException) {
- LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
+ LOG.warn("Time-out while fetching {} zk-data in {} sec", path, this.zkOperationTimeoutSeconds);
throw (InterruptedException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
@@ -300,7 +305,7 @@ public abstract class ZooKeeperCache implements Watcher {
throw new RuntimeException(cause);
}
} catch (TimeoutException e) {
- LOG.warn("Time-out while fetching {} zk-data in {} sec", path, cacheTimeOutInSec);
+ LOG.warn("Time-out while fetching {} zk-data in {} sec", path, this.zkOperationTimeoutSeconds);
asyncInvalidate(path);
throw e;
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
index defed3a..8ff2fc3 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperDataCache.java
@@ -22,14 +22,13 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +47,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
private final ZooKeeperCache cache;
private final List<ZooKeeperCacheListener<T>> listeners = Lists.newCopyOnWriteArrayList();
+ private final int zkOperationTimeoutSeconds;
private static final int FALSE = 0;
private static final int TRUE = 1;
@@ -58,6 +58,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
public ZooKeeperDataCache(final ZooKeeperCache cache) {
this.cache = cache;
+ this.zkOperationTimeoutSeconds = cache.getZkOperationTimeoutSeconds();
}
public CompletableFuture<Optional<T>> getAsync(String path) {
@@ -91,7 +92,7 @@ public abstract class ZooKeeperDataCache<T> implements Deserializer<T>, CacheUpd
* @throws Exception
*/
public Optional<T> get(final String path) throws Exception {
- return getAsync(path).get();
+ return getAsync(path).get(zkOperationTimeoutSeconds, TimeUnit.SECONDS);
}
public Optional<Entry<T, Stat>> getWithStat(final String path) throws Exception {
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
index 60e7c06..88034c7 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java
@@ -79,7 +79,7 @@ public class ZkBookieRackAffinityMappingTest {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
- bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1
@@ -108,7 +108,7 @@ public class ZkBookieRackAffinityMappingTest {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
- bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
@@ -152,7 +152,7 @@ public class ZkBookieRackAffinityMappingTest {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
- bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
index 3a807e8..7ed186d 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java
@@ -113,7 +113,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
- bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
@@ -182,7 +182,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
public void testNoBookieInfo() throws Exception {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
- bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE);
@@ -291,7 +291,7 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
- bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc) {
+ bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 39f23ac..e25c1ad 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -88,7 +88,7 @@ public class ZookeeperCacheTest {
@Test(timeOut = 10000)
void testSimpleCache() throws Exception {
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
@@ -128,7 +128,7 @@ public class ZookeeperCacheTest {
void testChildrenCache() throws Exception {
zkClient.create("/test", new byte[0], null, null);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
@@ -179,7 +179,7 @@ public class ZookeeperCacheTest {
@Test(timeOut = 10000)
void testChildrenCacheZnodeCreatedAfterCache() throws Exception {
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test");
// Create callback counter
@@ -240,7 +240,7 @@ public class ZookeeperCacheTest {
// Check existence after creation of the node
zkClient.create("/test", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -257,7 +257,7 @@ public class ZookeeperCacheTest {
zkClient.create("/test/c1", new byte[0], null, null);
zkClient.create("/test/c2", new byte[0], null, null);
Thread.sleep(20);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
boolean exists = zkCacheService.exists("/test");
Assert.assertTrue(exists, "/test should exists in the cache");
@@ -295,7 +295,7 @@ public class ZookeeperCacheTest {
}
};
- GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, "", executor,
+ GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
scheduledExecutor);
zkCacheService.start();
zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
@@ -399,7 +399,7 @@ public class ZookeeperCacheTest {
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {
@Override
public String deserialize(String key, byte[] content) throws Exception {
@@ -447,7 +447,7 @@ public class ZookeeperCacheTest {
// add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle
// callback-result process
MockZooKeeper zkClient = MockZooKeeper.newInstance(zkExecutor, 100);
- ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor);
+ ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor);
final AtomicInteger count = new AtomicInteger(0);
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {