You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/03/03 08:15:26 UTC

[pulsar] branch branch-2.9 updated: [fix][branch-2.9] Support zookeeper read-only config (#19693)

This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 0d2fa2af2fc [fix][branch-2.9] Support zookeeper read-only config (#19693)
0d2fa2af2fc is described below

commit 0d2fa2af2fce3a3253e4e569c9655c8d2d855db3
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Fri Mar 3 16:15:18 2023 +0800

    [fix][branch-2.9] Support zookeeper read-only config (#19693)
    
    Cherry-pick from #19156
---
 conf/broker.conf                                                   | 3 +++
 conf/proxy.conf                                                    | 3 +++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java   | 5 +++++
 .../java/org/apache/pulsar/broker/resources/PulsarResources.java   | 5 +++--
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java      | 4 ++--
 .../org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java | 5 +++--
 .../org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java  | 4 +++-
 .../org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java  | 4 +++-
 .../org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java    | 4 +++-
 .../test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java | 4 +++-
 .../org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 +++-
 .../apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java  | 4 +++-
 .../pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java   | 4 +++-
 .../pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java      | 4 +++-
 .../main/java/org/apache/pulsar/functions/worker/WorkerConfig.java | 5 +++++
 .../src/main/java/org/apache/pulsar/functions/worker/Worker.java   | 3 ++-
 .../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java    | 6 ++++++
 .../src/main/java/org/apache/pulsar/proxy/server/ProxyService.java | 4 ++--
 .../main/java/org/apache/pulsar/websocket/WebSocketService.java    | 7 ++++---
 19 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index c6901ebebc7..4ecdba5bb02 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -116,6 +116,9 @@ zooKeeperOperationTimeoutSeconds=30
 # ZooKeeper cache expiry time in seconds
 zooKeeperCacheExpirySeconds=300
 
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
 brokerShutdownTimeoutMs=60000
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 77ab31b80cf..9186a35f164 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -44,6 +44,9 @@ zookeeperSessionTimeoutMs=30000
 # ZooKeeper cache expiry time in seconds
 zooKeeperCacheExpirySeconds=300
 
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
 ### --- Server --- ###
 
 # Hostname or IP address the service binds on, default is 0.0.0.0.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 458511a28fc..d4912ae9fb2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -348,6 +348,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "ZooKeeper cache expiry time in seconds"
         )
     private int zooKeeperCacheExpirySeconds = 300;
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Is zookeeper allow read-only operations"
+    )
+    private boolean zookeeperAllowReadOnlyOperations = false;
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a01d57817ee..f8d6ba43045 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -91,9 +91,10 @@ public class PulsarResources {
         this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
     }
 
-    public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+    public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs,
+                                                            boolean allowReadOnlyOperations)
             throws MetadataStoreException {
         return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
-                .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+                .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 052ee90b536..38e8f34ab46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -352,7 +352,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         return MetadataStoreFactory.create(config.getConfigurationStoreServers(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
-                        .allowReadOnlyOperations(false)
+                        .allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
                         .build());
     }
 
@@ -963,7 +963,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
         return MetadataStoreExtended.create(config.getZookeeperServers(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
-                        .allowReadOnlyOperations(false)
+                        .allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
                         .build());
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index b848fa76d54..d0fdfbdb019 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.anyInt;
 
@@ -88,7 +88,8 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a2758b72a4e..984cec472a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.websocket.proxy;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         config.setWebServicePort(Optional.of(0));
         config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         service.start();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 184f86340fa..a9687411bd2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.websocket.proxy;
 
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
         config.setServiceUrl("http://localhost:8080");
         config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
         WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         service.start();
 
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index 71f8c0b6d86..738c9d898f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
         config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 6e0dfa46cfd..831b09660f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -66,7 +67,8 @@ public class ProxyPingTest extends ProducerConsumerBase {
         config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
         config.setWebSocketPingDurationSeconds(2);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 1e74cdae787..11a0384cf5a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -105,7 +106,8 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         config.setClusterName("test");
         config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index cdc2eb58d9a..5e7956599ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -78,7 +79,8 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
         config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 5baaacd52d9..c624e57ffb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -65,7 +66,8 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
         config.setServiceUrl(pulsar.getSafeWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 03227e9587d..627f9db3423 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -88,7 +89,8 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createMetadataStore(anyString(), anyInt(), anyBoolean());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index e51e5dfb742..25f6eb1d98f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -163,6 +163,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
             doc = "ZooKeeper cache expiry time in seconds"
         )
     private int zooKeeperCacheExpirySeconds = 300;
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "Is zookeeper allow read-only operations"
+    )
+    private boolean zookeeperAllowReadOnlyOperations = false;
     @FieldContext(
         category = CATEGORY_CONNECTORS,
         doc = "The path to the location to locate builtin connectors"
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index df3ce761fce..e9944ac95e4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -77,7 +77,8 @@ public class Worker {
             log.info("starting configuration cache service");
             try {
                 configMetadataStore = PulsarResources.createMetadataStore(workerConfig.getConfigurationStoreServers(),
-                        (int) workerConfig.getZooKeeperSessionTimeoutMillis());
+                        (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
+                        workerConfig.isZookeeperAllowReadOnlyOperations());
             } catch (IOException e) {
                 throw new PulsarServerException(e);
             }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e75db662a75..12ead928f5e 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -104,6 +104,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
         )
     private int zooKeeperCacheExpirySeconds = 300;
 
+    @FieldContext(
+            category = CATEGORY_BROKER_DISCOVERY,
+            doc = "Is zookeeper allow read-only operations"
+    )
+    private boolean zookeeperAllowReadOnlyOperations = false;
+
     @FieldContext(
         category = CATEGORY_BROKER_DISCOVERY,
         doc = "The service url points to the broker cluster"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 10e122e794d..6b11a3e805a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -378,12 +378,12 @@ public class ProxyService implements Closeable {
 
     public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
         return PulsarResources.createMetadataStore(proxyConfig.getZookeeperServers(),
-                proxyConfig.getZookeeperSessionTimeoutMs());
+                proxyConfig.getZookeeperSessionTimeoutMs(), proxyConfig.isZookeeperAllowReadOnlyOperations());
     }
 
     public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
         return PulsarResources.createMetadataStore(proxyConfig.getConfigurationStoreServers(),
-                proxyConfig.getZookeeperSessionTimeoutMs());
+                proxyConfig.getZookeeperSessionTimeoutMs(), proxyConfig.isZookeeperAllowReadOnlyOperations());
     }
 
     public Authentication getProxyClientAuthenticationPlugin() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 843e3a8ec82..929ee0826b0 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -103,7 +103,7 @@ public class WebSocketService implements Closeable {
         if (isNotBlank(config.getConfigurationStoreServers())) {
             try {
                 configMetadataStore = createMetadataStore(config.getConfigurationStoreServers(),
-                        (int) config.getZooKeeperSessionTimeoutMillis());
+                        (int) config.getZooKeeperSessionTimeoutMillis(), config.isZookeeperAllowReadOnlyOperations());
             } catch (MetadataStoreException e) {
                 throw new PulsarServerException(e);
             }
@@ -123,9 +123,10 @@ public class WebSocketService implements Closeable {
         log.info("Pulsar WebSocket Service started");
     }
 
-    public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+    public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs,
+                                                     boolean isAllowReadOnlyOperations)
             throws MetadataStoreException {
-        return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs);
+        return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs, isAllowReadOnlyOperations);
     }
 
     @Override