You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2023/03/03 08:15:26 UTC
[pulsar] branch branch-2.9 updated: [fix][branch-2.9] Support zookeeper read-only config (#19693)
This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 0d2fa2af2fc [fix][branch-2.9] Support zookeeper read-only config (#19693)
0d2fa2af2fc is described below
commit 0d2fa2af2fce3a3253e4e569c9655c8d2d855db3
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Fri Mar 3 16:15:18 2023 +0800
[fix][branch-2.9] Support zookeeper read-only config (#19693)
Cherry-pick from #19156
---
conf/broker.conf | 3 +++
conf/proxy.conf | 3 +++
.../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 5 +++++
.../java/org/apache/pulsar/broker/resources/PulsarResources.java | 5 +++--
.../src/main/java/org/apache/pulsar/broker/PulsarService.java | 4 ++--
.../org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java | 5 +++--
.../org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java | 4 +++-
.../test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java | 4 +++-
.../org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java | 4 +++-
.../apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java | 4 +++-
.../pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java | 4 +++-
.../pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java | 4 +++-
.../main/java/org/apache/pulsar/functions/worker/WorkerConfig.java | 5 +++++
.../src/main/java/org/apache/pulsar/functions/worker/Worker.java | 3 ++-
.../java/org/apache/pulsar/proxy/server/ProxyConfiguration.java | 6 ++++++
.../src/main/java/org/apache/pulsar/proxy/server/ProxyService.java | 4 ++--
.../main/java/org/apache/pulsar/websocket/WebSocketService.java | 7 ++++---
19 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index c6901ebebc7..4ecdba5bb02 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -116,6 +116,9 @@ zooKeeperOperationTimeoutSeconds=30
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 77ab31b80cf..9186a35f164 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -44,6 +44,9 @@ zookeeperSessionTimeoutMs=30000
# ZooKeeper cache expiry time in seconds
zooKeeperCacheExpirySeconds=300
+# Is zookeeper allow read-only operations.
+zookeeperAllowReadOnlyOperations=false
+
### --- Server --- ###
# Hostname or IP address the service binds on, default is 0.0.0.0.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 458511a28fc..d4912ae9fb2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -348,6 +348,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a01d57817ee..f8d6ba43045 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -91,9 +91,10 @@ public class PulsarResources {
this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
}
- public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+ public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs,
+ boolean allowReadOnlyOperations)
throws MetadataStoreException {
return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
- .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+ .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(allowReadOnlyOperations).build());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 052ee90b536..38e8f34ab46 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -352,7 +352,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
return MetadataStoreFactory.create(config.getConfigurationStoreServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+ .allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
.build());
}
@@ -963,7 +963,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
return MetadataStoreExtended.create(config.getZookeeperServers(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
- .allowReadOnlyOperations(false)
+ .allowReadOnlyOperations(config.isZookeeperAllowReadOnlyOperations())
.build());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index b848fa76d54..d0fdfbdb019 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -88,7 +88,8 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index a2758b72a4e..984cec472a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
config.setWebServicePort(Optional.of(0));
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 184f86340fa..a9687411bd2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.websocket.proxy;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -70,7 +71,8 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
config.setServiceUrl("http://localhost:8080");
config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
service.start();
PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
index 71f8c0b6d86..738c9d898f9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyIdleTimeoutTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -64,7 +65,8 @@ public class ProxyIdleTimeoutTest extends ProducerConsumerBase {
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
index 6e0dfa46cfd..831b09660f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPingTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -66,7 +67,8 @@ public class ProxyPingTest extends ProducerConsumerBase {
config.setWebSocketSessionIdleTimeoutMillis(3 * 1000);
config.setWebSocketPingDurationSeconds(2);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 1e74cdae787..11a0384cf5a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -105,7 +106,8 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
config.setClusterName("test");
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index cdc2eb58d9a..5e7956599ff 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -78,7 +79,8 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setConfigurationStoreServers(GLOBAL_DUMMY_VALUE);
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 5baaacd52d9..c624e57ffb2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -65,7 +66,8 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
config.setServiceUrl(pulsar.getSafeWebServiceAddress());
config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeper)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index 03227e9587d..627f9db3423 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.websocket.proxy.v1;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
@@ -88,7 +89,8 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
}
service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
- doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createMetadataStore(anyString(), anyInt(), anyBoolean());
proxyServer = new ProxyServer(config);
WebSocketServiceStarter.start(proxyServer, service);
log.info("Proxy Server Started");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index e51e5dfb742..25f6eb1d98f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -163,6 +163,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "ZooKeeper cache expiry time in seconds"
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
@FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index df3ce761fce..e9944ac95e4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -77,7 +77,8 @@ public class Worker {
log.info("starting configuration cache service");
try {
configMetadataStore = PulsarResources.createMetadataStore(workerConfig.getConfigurationStoreServers(),
- (int) workerConfig.getZooKeeperSessionTimeoutMillis());
+ (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
+ workerConfig.isZookeeperAllowReadOnlyOperations());
} catch (IOException e) {
throw new PulsarServerException(e);
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index e75db662a75..12ead928f5e 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -104,6 +104,12 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private int zooKeeperCacheExpirySeconds = 300;
+ @FieldContext(
+ category = CATEGORY_BROKER_DISCOVERY,
+ doc = "Is zookeeper allow read-only operations"
+ )
+ private boolean zookeeperAllowReadOnlyOperations = false;
+
@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The service url points to the broker cluster"
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 10e122e794d..6b11a3e805a 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -378,12 +378,12 @@ public class ProxyService implements Closeable {
public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
return PulsarResources.createMetadataStore(proxyConfig.getZookeeperServers(),
- proxyConfig.getZookeeperSessionTimeoutMs());
+ proxyConfig.getZookeeperSessionTimeoutMs(), proxyConfig.isZookeeperAllowReadOnlyOperations());
}
public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
return PulsarResources.createMetadataStore(proxyConfig.getConfigurationStoreServers(),
- proxyConfig.getZookeeperSessionTimeoutMs());
+ proxyConfig.getZookeeperSessionTimeoutMs(), proxyConfig.isZookeeperAllowReadOnlyOperations());
}
public Authentication getProxyClientAuthenticationPlugin() {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index 843e3a8ec82..929ee0826b0 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -103,7 +103,7 @@ public class WebSocketService implements Closeable {
if (isNotBlank(config.getConfigurationStoreServers())) {
try {
configMetadataStore = createMetadataStore(config.getConfigurationStoreServers(),
- (int) config.getZooKeeperSessionTimeoutMillis());
+ (int) config.getZooKeeperSessionTimeoutMillis(), config.isZookeeperAllowReadOnlyOperations());
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
@@ -123,9 +123,10 @@ public class WebSocketService implements Closeable {
log.info("Pulsar WebSocket Service started");
}
- public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+ public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs,
+ boolean isAllowReadOnlyOperations)
throws MetadataStoreException {
- return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs);
+ return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs, isAllowReadOnlyOperations);
}
@Override