You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/04/13 17:14:03 UTC
[pulsar] branch master updated: [pulsar-broker] support
configurable zk-cache expiry time (#6668)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb4bb8 [pulsar-broker] support configurable zk-cache expiry time (#6668)
1eb4bb8 is described below
commit 1eb4bb8effe64b267053136b83fa5ff9cc18896a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Apr 13 10:13:49 2020 -0700
[pulsar-broker] support configurable zk-cache expiry time (#6668)
### Motivation
Right now zk-cache expiry is hardcoded and it needs to be configurable to refresh value based on various requirement: eg: refresh quickly in case of zk-watch miss, avoid frequent cache refresh to avoid zk-read or avoid issue due to zk read timeout, etc..
### Modification
User can configure zk-cache expiry using `zooKeeperCacheExpirySeconds` configuration.
---
conf/broker.conf | 3 +++
conf/discovery.conf | 3 +++
conf/proxy.conf | 3 +++
conf/standalone.conf | 3 +++
conf/websocket.conf | 2 ++
.../java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../main/java/org/apache/pulsar/broker/PulsarService.java | 2 +-
.../pulsar/discovery/service/BrokerDiscoveryProvider.java | 3 ++-
.../pulsar/discovery/service/server/ServiceConfig.java | 11 +++++++++++
.../org/apache/pulsar/functions/worker/WorkerConfig.java | 5 +++++
.../main/java/org/apache/pulsar/functions/worker/Worker.java | 3 ++-
.../apache/pulsar/proxy/server/BrokerDiscoveryProvider.java | 3 ++-
.../org/apache/pulsar/proxy/server/ProxyConfiguration.java | 5 +++++
.../java/org/apache/pulsar/websocket/WebSocketService.java | 3 ++-
.../websocket/service/WebSocketProxyConfiguration.java | 10 ++++++++++
.../org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java | 4 ++--
.../java/org/apache/pulsar/zookeeper/ZooKeeperCache.java | 12 +++++++++---
.../java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java | 2 +-
18 files changed, 71 insertions(+), 11 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index eb4d22d..fa2e726 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -64,6 +64,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 57709dc..2cb5cc4 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -26,6 +26,9 @@ configurationStoreServers=
# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
# Port to use to server binary-proto request
servicePort=6650
diff --git a/conf/proxy.conf b/conf/proxy.conf
index e110ab4..b1eb305 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -41,6 +41,9 @@ functionWorkerWebServiceURLTLS=
# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
### --- Server --- ###
# The port to use for server binary Protobuf requests
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4095d88..1e60acc 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -54,6 +54,9 @@ zooKeeperSessionTimeoutMillis=30000
# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 600b138..26473e8 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -24,6 +24,8 @@ configurationStoreServers=
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
# Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
serviceUrl=
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f649910..20e1a17 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -208,6 +208,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "ZooKeeper cache expiry time in seconds"
+ )
+ private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4360be6..d790f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -630,7 +630,7 @@ public class PulsarService implements AutoCloseable {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
- getOrderedExecutor(), this.cacheExecutor);
+ getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index f302a60..414d2ce 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -74,7 +74,8 @@ public class BrokerDiscoveryProvider implements Closeable {
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
- config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
+ config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
+ config.getZooKeeperCacheExpirySeconds());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 17af9e4..30269e4 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -45,6 +45,9 @@ public class ServiceConfig implements PulsarConfiguration {
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;
+ // ZooKeeper cache expiry time in seconds
+ private int zooKeeperCacheExpirySeconds=300;
+
// Port to use to server binary-proto request
private Optional<Integer> servicePort = Optional.ofNullable(5000);
// Port to use to server binary-proto-tls request
@@ -134,6 +137,14 @@ public class ServiceConfig implements PulsarConfiguration {
this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
}
+ public int getZooKeeperCacheExpirySeconds() {
+ return zooKeeperCacheExpirySeconds;
+ }
+
+ public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
+ this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
+ }
+
public Optional<Integer> getServicePort() {
return servicePort;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index d5fbc49..24883df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -125,6 +125,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "ZooKeeper cache expiry time in seconds"
+ )
+ private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 0807d0f..7ace702 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -163,7 +163,8 @@ public class Worker {
(int) workerConfig.getZooKeeperSessionTimeoutMillis(),
workerConfig.getZooKeeperOperationTimeoutSeconds(),
workerConfig.getConfigurationStoreServers(),
- orderedExecutor, cacheExecutor);
+ orderedExecutor, cacheExecutor,
+ workerConfig.getZooKeeperOperationTimeoutSeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index b9b68c8..57a7a02 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -73,7 +73,8 @@ public class BrokerDiscoveryProvider implements Closeable {
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
- config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
+ config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
+ config.getZookeeperSessionTimeoutMs());
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 02c4549..e2f4644 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -88,6 +88,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
doc = "ZooKeeper session timeout (in milliseconds)"
)
private int zookeeperSessionTimeoutMs = 30_000;
+ @FieldContext(
+ category = CATEGORY_BROKER_DISCOVERY,
+ doc = "ZooKeeper cache expiry time in seconds"
+ )
+ private int zooKeeperCacheExpirySeconds = 300;
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 062af7e..ef9a71c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -105,7 +105,8 @@ public class WebSocketService implements Closeable {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
- config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
+ config.getConfigurationStoreServers(), this.orderedExecutor, this.executor,
+ config.getZooKeeperCacheExpirySeconds());
try {
this.globalZkCache.start();
} catch (IOException e) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 29a08f8..854d31b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -57,6 +57,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private String configurationStoreServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
+ // ZooKeeper cache expiry time in seconds
+ private int zooKeeperCacheExpirySeconds = 300;
// Port to use to server HTTP request
private Optional<Integer> webServicePort = Optional.of(8080);
@@ -198,6 +200,14 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
}
+ public int getZooKeeperCacheExpirySeconds() {
+ return zooKeeperCacheExpirySeconds;
+ }
+
+ public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
+ this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
+ }
+
public Optional<Integer> getWebServicePort() {
return webServicePort;
}
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index 4c237a4..9788b52 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -52,8 +52,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
- ScheduledExecutorService scheduledExecutor) {
- super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor);
+ ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) {
+ super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index fcc122c..2688c10 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -88,10 +88,16 @@ public abstract class ZooKeeperCache implements Watcher {
private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
private boolean shouldShutdownExecutor;
private final int zkOperationTimeoutSeconds;
+ private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
+ this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
+ }
+
+ public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
+ OrderedExecutor executor, int cacheExpirySeconds) {
checkNotNull(executor);
this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
this.executor = executor;
@@ -100,17 +106,17 @@ public abstract class ZooKeeperCache implements Watcher {
this.dataCache = Caffeine.newBuilder()
.recordStats()
- .expireAfterWrite(5, TimeUnit.MINUTES)
+ .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);
this.childrenCache = Caffeine.newBuilder()
.recordStats()
- .expireAfterWrite(5, TimeUnit.MINUTES)
+ .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);
this.existsCache = Caffeine.newBuilder()
.recordStats()
- .expireAfterWrite(5, TimeUnit.MINUTES)
+ .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
.buildAsync((key, executor1) -> null);
CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 0b4100a..6aa7ddd 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -299,7 +299,7 @@ public class ZookeeperCacheTest {
};
GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
- scheduledExecutor);
+ scheduledExecutor, 300);
zkCacheService.start();
zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {