You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/04/13 17:14:03 UTC

[pulsar] branch master updated: [pulsar-broker] support configurable zk-cache expiry time (#6668)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb4bb8  [pulsar-broker] support configurable zk-cache expiry time (#6668)
1eb4bb8 is described below

commit 1eb4bb8effe64b267053136b83fa5ff9cc18896a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Apr 13 10:13:49 2020 -0700

    [pulsar-broker] support configurable zk-cache expiry time (#6668)
    
    ### Motivation
    Right now zk-cache expiry is hardcoded and it needs to be configurable to refresh value based on various requirement: eg: refresh quickly in case of zk-watch miss, avoid frequent cache refresh to avoid zk-read or avoid issue due to zk read timeout, etc..
    
    ### Modification
    User can configure zk-cache expiry using `zooKeeperCacheExpirySeconds` configuration.
---
 conf/broker.conf                                             |  3 +++
 conf/discovery.conf                                          |  3 +++
 conf/proxy.conf                                              |  3 +++
 conf/standalone.conf                                         |  3 +++
 conf/websocket.conf                                          |  2 ++
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java  |  5 +++++
 .../main/java/org/apache/pulsar/broker/PulsarService.java    |  2 +-
 .../pulsar/discovery/service/BrokerDiscoveryProvider.java    |  3 ++-
 .../pulsar/discovery/service/server/ServiceConfig.java       | 11 +++++++++++
 .../org/apache/pulsar/functions/worker/WorkerConfig.java     |  5 +++++
 .../main/java/org/apache/pulsar/functions/worker/Worker.java |  3 ++-
 .../apache/pulsar/proxy/server/BrokerDiscoveryProvider.java  |  3 ++-
 .../org/apache/pulsar/proxy/server/ProxyConfiguration.java   |  5 +++++
 .../java/org/apache/pulsar/websocket/WebSocketService.java   |  3 ++-
 .../websocket/service/WebSocketProxyConfiguration.java       | 10 ++++++++++
 .../org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java    |  4 ++--
 .../java/org/apache/pulsar/zookeeper/ZooKeeperCache.java     | 12 +++++++++---
 .../java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java |  2 +-
 18 files changed, 71 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index eb4d22d..fa2e726 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -64,6 +64,9 @@ zooKeeperSessionTimeoutMillis=30000
 # ZooKeeper operation timeout in seconds
 zooKeeperOperationTimeoutSeconds=30
 
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
 brokerShutdownTimeoutMs=60000
 
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 57709dc..2cb5cc4 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -26,6 +26,9 @@ configurationStoreServers=
 # ZooKeeper session timeout
 zookeeperSessionTimeoutMs=30000
 
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
 # Port to use to server binary-proto request
 servicePort=6650
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index e110ab4..b1eb305 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -41,6 +41,9 @@ functionWorkerWebServiceURLTLS=
 # ZooKeeper session timeout (in milliseconds)
 zookeeperSessionTimeoutMs=30000
 
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
 ### --- Server --- ###
 
 # The port to use for server binary Protobuf requests
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 4095d88..1e60acc 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -54,6 +54,9 @@ zooKeeperSessionTimeoutMillis=30000
 # ZooKeeper operation timeout in seconds
 zooKeeperOperationTimeoutSeconds=30
 
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
 brokerShutdownTimeoutMs=60000
 
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 600b138..26473e8 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -24,6 +24,8 @@ configurationStoreServers=
 
 # Zookeeper session timeout in milliseconds
 zooKeeperSessionTimeoutMillis=30000
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
 
 # Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
 serviceUrl=
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f649910..20e1a17 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -208,6 +208,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
         )
     private int zooKeeperOperationTimeoutSeconds = 30;
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "ZooKeeper cache expiry time in seconds"
+        )
+    private int zooKeeperCacheExpirySeconds = 300;
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 4360be6..d790f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -630,7 +630,7 @@ public class PulsarService implements AutoCloseable {
         this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
                 (int) config.getZooKeeperSessionTimeoutMillis(),
                 config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
-                getOrderedExecutor(), this.cacheExecutor);
+                getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds());
         try {
             this.globalZkCache.start();
         } catch (IOException e) {
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index f302a60..414d2ce 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -74,7 +74,8 @@ public class BrokerDiscoveryProvider implements Closeable {
                     config.getZookeeperSessionTimeoutMs());
             globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
                     (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
-                    config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
+                    config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
+                    config.getZooKeeperCacheExpirySeconds());
             globalZkCache.start();
         } catch (Exception e) {
             LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index 17af9e4..30269e4 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -45,6 +45,9 @@ public class ServiceConfig implements PulsarConfiguration {
     // ZooKeeper session timeout
     private int zookeeperSessionTimeoutMs = 30_000;
 
+    // ZooKeeper cache expiry time in seconds
+    private int zooKeeperCacheExpirySeconds=300;
+
     // Port to use to server binary-proto request
     private Optional<Integer> servicePort = Optional.ofNullable(5000);
     // Port to use to server binary-proto-tls request
@@ -134,6 +137,14 @@ public class ServiceConfig implements PulsarConfiguration {
         this.zookeeperSessionTimeoutMs = zookeeperSessionTimeoutMs;
     }
 
+    public int getZooKeeperCacheExpirySeconds() {
+        return zooKeeperCacheExpirySeconds;
+    }
+
+    public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
+        this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
+    }
+
     public Optional<Integer> getServicePort() {
         return servicePort;
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index d5fbc49..24883df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -125,6 +125,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private int zooKeeperOperationTimeoutSeconds = 30;
     @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "ZooKeeper cache expiry time in seconds"
+        )
+    private int zooKeeperCacheExpirySeconds = 300;
+    @FieldContext(
         category = CATEGORY_CONNECTORS,
         doc = "The path to the location to locate builtin connectors"
     )
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 0807d0f..7ace702 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -163,7 +163,8 @@ public class Worker {
                     (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
                     workerConfig.getZooKeeperOperationTimeoutSeconds(),
                     workerConfig.getConfigurationStoreServers(),
-                    orderedExecutor, cacheExecutor);
+                    orderedExecutor, cacheExecutor,
+                    workerConfig.getZooKeeperOperationTimeoutSeconds());
             try {
                 this.globalZkCache.start();
             } catch (IOException e) {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index b9b68c8..57a7a02 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -73,7 +73,8 @@ public class BrokerDiscoveryProvider implements Closeable {
                     config.getZookeeperSessionTimeoutMs());
             globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
                     (int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
-                    config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
+                    config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler,
+                    config.getZookeeperSessionTimeoutMs());
             globalZkCache.start();
         } catch (Exception e) {
             LOG.error("Failed to start ZooKeeper {}", e.getMessage(), e);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 02c4549..e2f4644 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -88,6 +88,11 @@ public class ProxyConfiguration implements PulsarConfiguration {
         doc = "ZooKeeper session timeout (in milliseconds)"
     )
     private int zookeeperSessionTimeoutMs = 30_000;
+    @FieldContext(
+            category = CATEGORY_BROKER_DISCOVERY,
+            doc = "ZooKeeper cache expiry time in seconds"
+        )
+    private int zooKeeperCacheExpirySeconds = 300;
 
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 062af7e..ef9a71c 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -105,7 +105,8 @@ public class WebSocketService implements Closeable {
             this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
                     (int) config.getZooKeeperSessionTimeoutMillis(),
                     (int) TimeUnit.MILLISECONDS.toSeconds(config.getZooKeeperSessionTimeoutMillis()),
-                    config.getConfigurationStoreServers(), this.orderedExecutor, this.executor);
+                    config.getConfigurationStoreServers(), this.orderedExecutor, this.executor,
+                    config.getZooKeeperCacheExpirySeconds());
             try {
                 this.globalZkCache.start();
             } catch (IOException e) {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 29a08f8..854d31b 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -57,6 +57,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
     private String configurationStoreServers;
     // Zookeeper session timeout in milliseconds
     private long zooKeeperSessionTimeoutMillis = 30000;
+    // ZooKeeper cache expiry time in seconds
+    private int zooKeeperCacheExpirySeconds = 300;
 
     // Port to use to server HTTP request
     private Optional<Integer> webServicePort = Optional.of(8080);
@@ -198,6 +200,14 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
         this.zooKeeperSessionTimeoutMillis = zooKeeperSessionTimeoutMillis;
     }
 
+    public int getZooKeeperCacheExpirySeconds() {
+        return zooKeeperCacheExpirySeconds;
+    }
+
+    public void setZooKeeperCacheExpirySeconds(int zooKeeperCacheExpirySeconds) {
+        this.zooKeeperCacheExpirySeconds = zooKeeperCacheExpirySeconds;
+    }
+
     public Optional<Integer> getWebServicePort() {
         return webServicePort;
     }
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
index 4c237a4..9788b52 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java
@@ -52,8 +52,8 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {
 
     public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
             int zkOperationTimeoutSeconds, String globalZkConnect, OrderedExecutor orderedExecutor,
-            ScheduledExecutorService scheduledExecutor) {
-        super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor);
+            ScheduledExecutorService scheduledExecutor, int cacheExpirySeconds) {
+        super("global-zk", null, zkOperationTimeoutSeconds, orderedExecutor, cacheExpirySeconds);
         this.zlClientFactory = zkClientFactory;
         this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
         this.globalZkConnect = globalZkConnect;
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index fcc122c..2688c10 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -88,10 +88,16 @@ public abstract class ZooKeeperCache implements Watcher {
     private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
     private boolean shouldShutdownExecutor;
     private final int zkOperationTimeoutSeconds;
+    private static final int CACHE_EXPIRY_SECONDS = 300; //5 minutes
 
     protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);
 
     public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds, OrderedExecutor executor) {
+        this(cacheName, zkSession, zkOperationTimeoutSeconds, executor, CACHE_EXPIRY_SECONDS);
+    }
+    
+    public ZooKeeperCache(String cacheName, ZooKeeper zkSession, int zkOperationTimeoutSeconds,
+            OrderedExecutor executor, int cacheExpirySeconds) {
         checkNotNull(executor);
         this.zkOperationTimeoutSeconds = zkOperationTimeoutSeconds;
         this.executor = executor;
@@ -100,17 +106,17 @@ public abstract class ZooKeeperCache implements Watcher {
 
         this.dataCache = Caffeine.newBuilder()
                 .recordStats()
-                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
                 .buildAsync((key, executor1) -> null);
 
         this.childrenCache = Caffeine.newBuilder()
                 .recordStats()
-                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
                 .buildAsync((key, executor1) -> null);
 
         this.existsCache = Caffeine.newBuilder()
                 .recordStats()
-                .expireAfterWrite(5, TimeUnit.MINUTES)
+                .expireAfterWrite(zkOperationTimeoutSeconds, TimeUnit.SECONDS)
                 .buildAsync((key, executor1) -> null);
 
         CacheMetricsCollector.CAFFEINE.addCache(cacheName + "-data", dataCache);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 0b4100a..6aa7ddd 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -299,7 +299,7 @@ public class ZookeeperCacheTest {
         };
 
         GlobalZooKeeperCache zkCacheService = new GlobalZooKeeperCache(zkClientfactory, -1, 30, "", executor,
-                scheduledExecutor);
+                scheduledExecutor, 300);
         zkCacheService.start();
         zkClient = (MockZooKeeper) zkCacheService.getZooKeeper();
         ZooKeeperDataCache<String> zkCache = new ZooKeeperDataCache<String>(zkCacheService) {