You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/01 16:35:59 UTC
[incubator-pulsar] branch master updated: Issue 1014: Rename
"global zookeeper" to "configuration-store"(change in code,
conf and cli) (#1059)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c62b5c0 Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) (#1059)
c62b5c0 is described below
commit c62b5c014b1f69c8c9aea78be22d3f9c95eefb99
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Wed May 2 00:35:57 2018 +0800
Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) (#1059)
* global-zookeeper => configuration-store: change in code, conf and cli
* change following @sijie's comments
* remove un-used imports
* change following @ivan's comments
* change following comments
* fix rebase error
* change for PR #1572 and #1223, fix integration fail
* Fix cube definitions after global zookeeper is renamed to configuration-store
* Hide deprecated options and few more adjustments
* Fixed required status of global-zk argument in cluster init tool
* Limit the memory usage for processes
* Fix the aliases and limit the memory usage
* remove environment settings
* Fix time.sleep at watch-znode.py
* revert unstarted to use started yml
* Fix invalid `""`
---
bin/pulsar | 26 +++++++--
bin/pulsar-admin | 1 -
bin/pulsar-daemon | 21 ++++---
conf/broker.conf | 11 +++-
conf/discovery.conf | 10 +++-
conf/proxy.conf | 14 +++--
conf/standalone.conf | 10 +++-
conf/websocket.conf | 12 +++-
deployment/dcos/PulsarGroups.json | 4 +-
deployment/kubernetes/aws/broker.yaml | 2 +-
deployment/kubernetes/generic/broker.yaml | 2 +-
deployment/kubernetes/generic/proxy.yaml | 2 +-
.../google-kubernetes-engine/broker.yaml | 2 +-
.../google-kubernetes-engine/cluster-metadata.yaml | 2 +-
.../kubernetes/google-kubernetes-engine/proxy.yaml | 2 +-
deployment/terraform-ansible/deploy-pulsar.yaml | 6 +-
deployment/terraform-ansible/templates/broker.conf | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 24 ++++++++
.../PulsarConfigurationLoaderTest.java | 6 +-
.../apache/pulsar/PulsarClusterMetadataSetup.java | 51 ++++++++++------
.../org/apache/pulsar/PulsarStandaloneStarter.java | 2 +-
.../org/apache/pulsar/broker/PulsarService.java | 6 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 8 +--
.../org/apache/pulsar/PulsarBrokerStarterTest.java | 8 +--
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 +-
.../pulsar/broker/service/ReplicatorTestBase.java | 6 +-
.../broker/service/v1/V1_ReplicatorTestBase.java | 6 +-
.../pulsar/client/api/NonPersistentTopicTest.java | 8 +--
.../websocket/proxy/ProxyAuthenticationTest.java | 11 +---
.../websocket/proxy/ProxyAuthorizationTest.java | 2 +-
.../websocket/proxy/ProxyPublishConsumeTest.java | 2 +-
.../proxy/ProxyPublishConsumeTlsTest.java | 2 +-
.../proxy/v1/V1_ProxyAuthenticationTest.java | 6 +-
.../configurations/pulsar_broker_test.conf | 2 +-
pulsar-client-cpp/test-conf/standalone-ssl.conf | 5 +-
pulsar-client-cpp/tests/authentication.conf | 5 +-
pulsar-client-cpp/tests/standalone.conf | 31 +++++-----
.../common/conf/InternalConfigurationData.java | 16 +++---
.../discovery/service/BrokerDiscoveryProvider.java | 2 +-
.../service/server/DiscoveryServiceStarter.java | 14 +++--
.../discovery/service/server/ServiceConfig.java | 19 +++++-
.../service/server/DiscoveryServiceWebTest.java | 67 ++++++++++++++++++++--
.../proxy/server/BrokerDiscoveryProvider.java | 2 +-
.../pulsar/proxy/server/ProxyConfiguration.java | 16 +++++-
.../apache/pulsar/proxy/server/ProxyService.java | 4 +-
.../pulsar/proxy/server/ProxyServiceStarter.java | 13 ++++-
.../ProxyAuthenticatedProducerConsumerTest.java | 2 +-
.../server/ProxyConnectionThrottlingTest.java | 2 +-
.../proxy/server/ProxyLookupThrottlingTest.java | 2 +-
.../org/apache/pulsar/proxy/server/ProxyTest.java | 2 +-
.../apache/pulsar/proxy/server/ProxyTlsTest.java | 2 +-
.../apache/pulsar/websocket/WebSocketService.java | 9 +--
.../service/WebSocketProxyConfiguration.java | 18 +++++-
...Starter.java => ConfigurationStoreStarter.java} | 4 +-
site/docs/latest/deployment/cluster.md | 4 +-
.../latest-version-image/conf/bookie.conf | 2 +-
.../latest-version-image/conf/broker.conf | 1 +
.../latest-version-image/conf/global-zk.conf | 3 +-
.../latest-version-image/conf/local-zk.conf | 1 +
.../latest-version-image/conf/proxy.conf | 2 +
.../latest-version-image/scripts/init-cluster.sh | 2 +-
tests/integration-tests-base/pom.xml | 1 -
...single-cluster-3-bookie-2-broker-unstarted.yaml | 16 +++---
.../single-cluster-3-bookie-2-broker.yaml | 16 +++---
.../java/org/apache/pulsar/tests/DockerUtils.java | 1 +
.../apache/pulsar/tests/PulsarClusterUtils.java | 4 ++
.../pulsar/tests/integration/TestCompaction.java | 16 +++---
.../smoke/src/test/resources/arquillian.xml | 1 +
69 files changed, 400 insertions(+), 188 deletions(-)
diff --git a/bin/pulsar b/bin/pulsar
index 0bd2228..4a58d30 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -24,7 +24,7 @@ PULSAR_HOME=`cd $BINDIR/..;pwd`
DEFAULT_BROKER_CONF=$PULSAR_HOME/conf/broker.conf
DEFAULT_BOOKKEEPER_CONF=$PULSAR_HOME/conf/bookkeeper.conf
DEFAULT_ZK_CONF=$PULSAR_HOME/conf/zookeeper.conf
-DEFAULT_GLOBAL_ZK_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
+DEFAULT_CONFIGURATION_STORE_CONF=$PULSAR_HOME/conf/global_zookeeper.conf
DEFAULT_DISCOVERY_CONF=$PULSAR_HOME/conf/discovery.conf
DEFAULT_PROXY_CONF=$PULSAR_HOME/conf/proxy.conf
DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
@@ -103,18 +103,19 @@ pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
+
broker Run a broker server
bookie Run a bookie server
zookeeper Run a zookeeper server
- global-zookeeper Run a global-zookeeper server
+ configuration-store Run a configuration-store server
discovery Run a discovery server
proxy Run a pulsar proxy
websocket Run a web socket proxy server
functions-worker Run a functions worker server
standalone Run a broker server with local bookies and local zookeeper
- compact-topic Run compaction against a topic
initialize-cluster-metadata One-time metadata initialization
+ compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
help This help message
@@ -126,7 +127,7 @@ Environment variables:
PULSAR_BROKER_CONF Configuration file for broker (default: $DEFAULT_BROKER_CONF)
PULSAR_BOOKKEEPER_CONF Configuration file for bookie (default: $DEFAULT_BOOKKEEPER_CONF)
PULSAR_ZK_CONF Configuration file for zookeeper (default: $DEFAULT_ZK_CONF)
- PULSAR_GLOBAL_ZK_CONF Configuration file for global zookeeper (default: $DEFAULT_GLOBAL_ZK_CONF)
+ PULSAR_CONFIGURATION_STORE_CONF Configuration file for global configuration store (default: $DEFAULT_CONFIGURATION_STORE_CONF)
PULSAR_DISCOVERY_CONF Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
PULSAR_WEBSOCKET_CONF Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
PULSAR_PROXY_CONF Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
@@ -192,6 +193,11 @@ fi
if [ -z "$PULSAR_GLOBAL_ZK_CONF" ]; then
PULSAR_GLOBAL_ZK_CONF=$DEFAULT_GLOBAL_ZK_CONF
+ PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_GLOBAL_ZK_CONF
+fi
+
+if [ -z "$PULSAR_CONFIGURATION_STORE_CONF" ]; then
+ PULSAR_CONFIGURATION_STORE_CONF=$DEFAULT_CONFIGURATION_STORE_CONF
fi
if [ -z "$PULSAR_DISCOVERY_CONF" ]; then
@@ -260,7 +266,12 @@ elif [ $COMMAND == "global-zookeeper" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"global-zookeeper.log"}
# Allow global ZK to turn into read-only mode when it cannot reach the quorum
OPTS="${OPTS} -Dreadonlymode.enabled=true"
- exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.GlobalZooKeeperStarter $PULSAR_GLOBAL_ZK_CONF $@
+ exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_GLOBAL_ZK_CONF $@
+elif [ $COMMAND == "configuration-store" ]; then
+ PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"configuration-store.log"}
+ # Allow global ZK to turn into read-only mode when it cannot reach the quorum
+ OPTS="${OPTS} -Dreadonlymode.enabled=true"
+ exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.zookeeper.ConfigurationStoreStarter $PULSAR_CONFIGURATION_STORE_CONF $@
elif [ $COMMAND == "discovery" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"discovery.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter $PULSAR_DISCOVERY_CONF $@
@@ -285,5 +296,8 @@ elif [ $COMMAND == "compact-topic" ]; then
elif [ $COMMAND == "help" ]; then
pulsar_help;
else
- exec $JAVA $OPTS $COMMAND $@
+ echo ""
+ echo "-- Invalid command '$COMMAND' -- Use '$0 help' to get a list of valid commands"
+ echo ""
+ exit 1
fi
diff --git a/bin/pulsar-admin b/bin/pulsar-admin
index c8b41da..837a605 100755
--- a/bin/pulsar-admin
+++ b/bin/pulsar-admin
@@ -146,5 +146,4 @@ OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
#Change to PULSAR_HOME to support relative paths
cd "$PULSAR_HOME"
-
exec $JAVA $OPTS org.apache.pulsar.admin.cli.PulsarAdminTool $PULSAR_CLIENT_CONF "$@"
diff --git a/bin/pulsar-daemon b/bin/pulsar-daemon
index 5853f11..0957413 100755
--- a/bin/pulsar-daemon
+++ b/bin/pulsar-daemon
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,13 +22,13 @@ usage() {
cat <<EOF
Usage: pulsar-daemon (start|stop) <command> <args...>
where command is one of:
- broker Run a broker server
- bookie Run a bookie server
- zookeeper Run a zookeeper server
- global-zookeeper Run a global-zookeeper server
- discovery Run a discovery server
- websocket Run a websocket proxy server
- standalone Run a standalone Pulsar service
+ broker Run a broker server
+ bookie Run a bookie server
+ zookeeper Run a zookeeper server
+ configuration-store Run a configuration-store server
+ discovery Run a discovery server
+ websocket Run a websocket proxy server
+ standalone Run a standalone Pulsar service
where argument is one of:
-force (accepted only with stop command): Decides whether to stop the server forcefully if not stopped by normal shutdown
@@ -73,6 +73,9 @@ case $command in
(global-zookeeper)
echo "doing $startStop $command ..."
;;
+ (configuration-store)
+ echo "doing $startStop $command ..."
+ ;;
(discovery)
echo "doing $startStop $command ..."
;;
diff --git a/conf/broker.conf b/conf/broker.conf
index 6f502a5..dc7ca04 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -22,8 +22,8 @@
# Zookeeper quorum connection string
zookeeperServers=
-# Global Zookeeper quorum connection string
-globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
# Broker data port
brokerServicePort=6650
@@ -457,3 +457,10 @@ functionsWorkerEnabled=false
# Enable topic level metrics
exposePublisherStats=true
+
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
+
diff --git a/conf/discovery.conf b/conf/discovery.conf
index 87f887f..b1b6f41 100644
--- a/conf/discovery.conf
+++ b/conf/discovery.conf
@@ -20,8 +20,8 @@
# Zookeeper quorum connection string (comma-separated)
zookeeperServers=
-# Global zookeeper quorum connection string (comma-separated)
-globalZookeeperServers=
+# Configuration Store connection string (comma-separated)
+configurationStoreServers=
# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000
@@ -77,3 +77,9 @@ tlsKeyFilePath=
# Specify whether Client certificates are required for TLS
# Reject the Connection if the Client Certificate is not trusted.
tlsRequireTrustedClientCertOnConnect=false
+
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
diff --git a/conf/proxy.conf b/conf/proxy.conf
index f731240..a904f47 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -20,8 +20,8 @@
# Zookeeper quorum connection string (comma-separated)
zookeeperServers=
-# Global zookeeper quorum connection string (comma-separated)
-globalZookeeperServers=
+# Configuration Store connection string (comma-separated)
+configurationStoreServers=
# ZooKeeper session timeout
zookeeperSessionTimeoutMs=30000
@@ -70,10 +70,10 @@ superUserRoles=
forwardAuthorizationCredentials=false
# --- RateLimiting ----
-# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000
+# Max concurrent inbound Connections, proxy will reject requests beyond that. Default value is 10,000
maxConcurrentInboundConnections=10000
-# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000
+# Max concurrent outbound Connections, proxy will error out requests beyond that. Default value is 10,000
maxConcurrentLookupRequests=10000
##### --- TLS --- #####
@@ -96,3 +96,9 @@ tlsHostnameVerificationEnabled=false
# Specify whether Client certificates are required for TLS
# Reject the Connection if the Client Certificate is not trusted.
tlsRequireTrustedClientCertOnConnect=false
+
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2beddf5..f5c9546 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -22,8 +22,8 @@
# Zookeeper quorum connection string
zookeeperServers=
-# Global Zookeeper quorum connection string
-globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
brokerServicePort=6650
@@ -396,3 +396,9 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable topic level metrics
exposePublisherStats=true
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
+
diff --git a/conf/websocket.conf b/conf/websocket.conf
index 87accac..f9f2436 100644
--- a/conf/websocket.conf
+++ b/conf/websocket.conf
@@ -19,13 +19,13 @@
### --- Web Socket proxy settings --- ###
-# Global Zookeeper quorum connection string
-globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000
-# Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
+# Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
serviceUrl=
serviceUrlTls=
brokerServiceUrl=
@@ -103,3 +103,9 @@ tlsTrustCertsFilePath=
# Specify whether Client certificates are required for TLS
# Reject the Connection if the Client Certificate is not trusted.
tlsRequireTrustedClientCertOnConnect=false
+
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=
diff --git a/deployment/dcos/PulsarGroups.json b/deployment/dcos/PulsarGroups.json
index 6585976..990ec57 100644
--- a/deployment/dcos/PulsarGroups.json
+++ b/deployment/dcos/PulsarGroups.json
@@ -182,7 +182,7 @@
"// Notice": "add PULSAR_MEM, PULSAR_GC, according to your environment.",
"webServicePort": "8082",
"zookeeperServers": "master.mesos:2181",
- "globalZookeeperServers": "master.mesos:2181",
+ "configurationStoreServers": "master.mesos:2181",
"clusterName": "pulsar-dcos"
},
@@ -223,7 +223,7 @@
"env": {
"webServicePort": "8082",
"zookeeperServers": "master.mesos:2181",
- "globalZookeeperServers": "master.mesos:2181",
+ "configurationStoreServers": "master.mesos:2181",
"clusterName": "pulsar-dcos",
"webServiceUrl": "http://broker.brokers.pulsar.marathon.mesos:8082",
"brokerServiceUrl": "pulsar://broker.brokers.pulsar.marathon.mesos:6650"
diff --git a/deployment/kubernetes/aws/broker.yaml b/deployment/kubernetes/aws/broker.yaml
index c01ff20..70940ef 100644
--- a/deployment/kubernetes/aws/broker.yaml
+++ b/deployment/kubernetes/aws/broker.yaml
@@ -28,7 +28,7 @@ data:
PULSAR_MEM: "\" -Dio.netty.leakDetectionLevel=disabled -Dio.netty.recycler.linkCapacity=1024 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32 -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError -XX:+PerfDisableSharedMem -Xms12g -Xmx12g -XX:MaxDirectMemorySize=14g -Dpulsar.root.logger=DEBUG,FILE \""
PULSAR_GC: "\" -XX:+UseG1GC -XX:MaxGCPauseMillis=10\""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
- globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
+ configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: us-east
managedLedgerDefaultEnsembleSize: "2"
managedLedgerDefaultWriteQuorum: "2"
diff --git a/deployment/kubernetes/generic/broker.yaml b/deployment/kubernetes/generic/broker.yaml
index 452cdf1..cf760c1 100644
--- a/deployment/kubernetes/generic/broker.yaml
+++ b/deployment/kubernetes/generic/broker.yaml
@@ -27,7 +27,7 @@ data:
# better GC behavior at high throughput
PULSAR_MEM: "\" -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g\""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
- globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
+ configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: us-central
---
##
diff --git a/deployment/kubernetes/generic/proxy.yaml b/deployment/kubernetes/generic/proxy.yaml
index 453b91e..6715cf1 100644
--- a/deployment/kubernetes/generic/proxy.yaml
+++ b/deployment/kubernetes/generic/proxy.yaml
@@ -25,7 +25,7 @@ metadata:
data:
PULSAR_MEM: "\" -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g\""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
- globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
+ configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: us-central
---
##
diff --git a/deployment/kubernetes/google-kubernetes-engine/broker.yaml b/deployment/kubernetes/google-kubernetes-engine/broker.yaml
index a07baa1..6e0d78e 100644
--- a/deployment/kubernetes/google-kubernetes-engine/broker.yaml
+++ b/deployment/kubernetes/google-kubernetes-engine/broker.yaml
@@ -26,7 +26,7 @@ data:
PULSAR_MEM: "\" -Xms8g -Xmx8g -XX:MaxDirectMemorySize=4g\""
PULSAR_GC: "\" -XX:+UseG1GC \""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
- globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
+ configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: pulsar-gke
---
##
diff --git a/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml b/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml
index e4afffb..0027490 100644
--- a/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml
+++ b/deployment/kubernetes/google-kubernetes-engine/cluster-metadata.yaml
@@ -36,7 +36,7 @@ spec:
bin/pulsar initialize-cluster-metadata \
--cluster us-central \
--zookeeper zookeeper \
- --global-zookeeper zookeeper \
+ --configuration-store zookeeper \
--web-service-url http://broker.default.svc.cluster.local:8080/ \
--broker-service-url pulsar://broker.default.svc.cluster.local:6650/ || true;
restartPolicy: Never
diff --git a/deployment/kubernetes/google-kubernetes-engine/proxy.yaml b/deployment/kubernetes/google-kubernetes-engine/proxy.yaml
index e62aa2a..138f12f 100644
--- a/deployment/kubernetes/google-kubernetes-engine/proxy.yaml
+++ b/deployment/kubernetes/google-kubernetes-engine/proxy.yaml
@@ -24,7 +24,7 @@ metadata:
data:
PULSAR_MEM: "\" -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g\""
zookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
- globalZookeeperServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
+ configurationStoreServers: zk-0.zookeeper,zk-1.zookeeper,zk-2.zookeeper
clusterName: pulsar-gke
---
diff --git a/deployment/terraform-ansible/deploy-pulsar.yaml b/deployment/terraform-ansible/deploy-pulsar.yaml
index c0e8eb8..bec552a 100644
--- a/deployment/terraform-ansible/deploy-pulsar.yaml
+++ b/deployment/terraform-ansible/deploy-pulsar.yaml
@@ -88,7 +88,7 @@
state: directory
with_items:
- data/zookeeper
- - name: Add pulsar_env.sh configuration file
+ - name: Add pulsar_env.sh configuration file
template:
src: "../templates/pulsar_env.sh"
dest: "/opt/pulsar/conf/pulsar_env.sh"
@@ -114,7 +114,7 @@
bin/pulsar initialize-cluster-metadata \
--cluster {{ cluster_name }} \
--zookeeper localhost:2181 \
- --global-zookeeper localhost:2181 \
+ --configuration-store localhost:2181 \
--web-service-url {{ http_url }} \
--broker-service-url {{ service_url }}
args:
@@ -161,7 +161,7 @@
args:
chdir: /opt/pulsar
when: groups['zookeeper'][0] == inventory_hostname
-
+
- name: Hosts addresses
hosts: localhost
become: false
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index de78366..d168100 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -25,7 +25,7 @@
zookeeperServers={{ zookeeper_servers }}
# Global Zookeeper quorum connection string
-globalZookeeperServers={{ zookeeper_servers }}
+configurationStoreServers={{ zookeeper_servers }}
# Hostname or IP address the service advertises to the outside world. If not set, the value of InetAddress.getLocalHost().getHostName() is used.
advertisedAddress={{ hostvars[inventory_hostname].public_ip }}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e6c37ba..5cf31c1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -41,8 +41,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(required = true)
private String zookeeperServers;
// Global Zookeeper quorum connection string
+ @Deprecated
@FieldContext(required = false)
private String globalZookeeperServers;
+ // Configuration Store connection string
+ @FieldContext(required = false)
+ private String configurationStoreServers;
private int brokerServicePort = 6650;
private int brokerServicePortTls = 6651;
// Port to use to server HTTP request
@@ -467,6 +471,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
this.zookeeperServers = zookeeperServers;
}
+ /**
+ * @deprecated See {@link #getConfigurationStoreServers}
+ */
+ @Deprecated
public String getGlobalZookeeperServers() {
if (this.globalZookeeperServers == null || this.globalZookeeperServers.isEmpty()) {
// If the configuration is not set, assuming that the globalZK is not enabled and all data is in the same
@@ -476,10 +484,26 @@ public class ServiceConfiguration implements PulsarConfiguration {
return globalZookeeperServers;
}
+ /**
+ * @deprecated See {@link #setConfigurationStoreServers(String)}
+ */
+ @Deprecated
public void setGlobalZookeeperServers(String globalZookeeperServers) {
this.globalZookeeperServers = globalZookeeperServers;
}
+ public String getConfigurationStoreServers() {
+ if (this.configurationStoreServers == null || this.configurationStoreServers.isEmpty()) {
+ // If the configuration is not set, assuming that all data is in the same as globalZK cluster
+ return this.getGlobalZookeeperServers();
+ }
+ return configurationStoreServers;
+ }
+
+ public void setConfigurationStoreServers(String configurationStoreServers) {
+ this.configurationStoreServers = configurationStoreServers;
+ }
+
public int getBrokerServicePort() {
return brokerServicePort;
}
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
index d830c4f..baa0e75 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoaderTest.java
@@ -41,7 +41,7 @@ public class PulsarConfigurationLoaderTest {
private Properties properties = new Properties();
private String zookeeperServers = "localhost:2181";
- private String globalZookeeperServers = "localhost:2184";
+ private String configurationStoreServers = "localhost:2184";
private int brokerServicePort = 7650;
private int brokerServicePortTls = 7651;
private int webServicePort = 9080;
@@ -66,7 +66,7 @@ public class PulsarConfigurationLoaderTest {
// check whether converting correctly
assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181");
- assertEquals(serviceConfiguration.getGlobalZookeeperServers(), "localhost:2184");
+ assertEquals(serviceConfiguration.getConfigurationStoreServers(), "localhost:2184");
assertEquals(serviceConfiguration.getBrokerServicePort(), 7650);
assertEquals(serviceConfiguration.getBrokerServicePortTls(), 7651);
assertEquals(serviceConfiguration.getWebServicePort(), 9080);
@@ -90,7 +90,7 @@ public class PulsarConfigurationLoaderTest {
final String zkServer = "z1.example.com,z2.example.com,z3.example.com";
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=" + zkServer);
- printWriter.println("globalZookeeperServers=gz1.example.com,gz2.example.com,gz3.example.com/foo");
+ printWriter.println("configurationStoreServers=gz1.example.com,gz2.example.com,gz3.example.com/foo");
printWriter.println("brokerDeleteInactiveTopicsEnabled=true");
printWriter.println("statusFilePath=/tmp/status.html");
printWriter.println("managedLedgerDefaultEnsembleSize=1");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 5dfa80d..3e62c9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -79,9 +79,13 @@ public class PulsarClusterMetadataSetup {
private String zookeeper;
@Parameter(names = { "-gzk",
- "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = true)
+ "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = false, hidden = true)
private String globalZookeeper;
+ @Parameter(names = { "-cs",
+ "--configuration-store" }, description = "Configuration Store connection string", required = false)
+ private String configurationStore;
+
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
}
@@ -101,12 +105,27 @@ public class PulsarClusterMetadataSetup {
throw e;
}
- log.info("Setting up cluster {} with zk={} global-zk={}", arguments.cluster, arguments.zookeeper,
- arguments.globalZookeeper);
+ if (arguments.configurationStore == null && arguments.globalZookeeper == null) {
+ System.err.println("Configuration store address argument is required (--configuration-store)");
+ jcommander.usage();
+ System.exit(1);
+ }
+
+ if (arguments.configurationStore != null && arguments.globalZookeeper != null) {
+ System.err.println("Configuration store argument (--configuration-store) supercedes the deprecated (--global-zookeeper) argument");
+ jcommander.usage();
+ System.exit(1);
+ }
+
+ if (arguments.configurationStore == null) {
+ arguments.configurationStore = arguments.globalZookeeper;
+ }
+ log.info("Setting up cluster {} with zk={} configuration-store ={}", arguments.cluster, arguments.zookeeper,
+ arguments.configurationStore);
ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
ZooKeeper localZk = zkfactory.create(arguments.zookeeper, SessionType.ReadWrite, 30000).get();
- ZooKeeper globalZk = zkfactory.create(arguments.globalZookeeper, SessionType.ReadWrite, 30000).get();
+ ZooKeeper configStoreZk = zkfactory.create(arguments.configurationStore, SessionType.ReadWrite, 30000).get();
// Format BookKeeper metadata
ServerConfiguration bkConf = new ServerConfiguration();
@@ -121,14 +140,14 @@ public class PulsarClusterMetadataSetup {
localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
try {
- ZkUtils.createFullPathOptimistic(globalZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ ZkUtils.createFullPathOptimistic(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
try {
- ZkUtils.createFullPathOptimistic(globalZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ ZkUtils.createFullPathOptimistic(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
@@ -138,7 +157,7 @@ public class PulsarClusterMetadataSetup {
arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);
- globalZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ configStoreZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// Create marker for "global" cluster
@@ -146,7 +165,7 @@ public class PulsarClusterMetadataSetup {
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);
try {
- globalZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ configStoreZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
@@ -155,12 +174,12 @@ public class PulsarClusterMetadataSetup {
// Create public tenant, whitelisted to use the this same cluster, along with other clusters
String publicTenantPath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT;
- Stat stat = globalZk.exists(publicTenantPath, false);
+ Stat stat = configStoreZk.exists(publicTenantPath, false);
if (stat == null) {
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(arguments.cluster));
try {
- ZkUtils.createFullPathOptimistic(globalZk, publicTenantPath,
+ ZkUtils.createFullPathOptimistic(configStoreZk, publicTenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
@@ -168,14 +187,14 @@ public class PulsarClusterMetadataSetup {
}
} else {
// Update existing public tenant with new cluster
- byte[] content = globalZk.getData(publicTenantPath, false, null);
+ byte[] content = configStoreZk.getData(publicTenantPath, false, null);
TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
// Only update z-node if the list of clusters should be modified
if (!publicTenant.getAllowedClusters().contains(arguments.cluster)) {
publicTenant.getAllowedClusters().add(arguments.cluster);
- globalZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
+ configStoreZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
stat.getVersion());
}
}
@@ -184,7 +203,7 @@ public class PulsarClusterMetadataSetup {
String defaultNamespacePath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE;
Policies policies;
- stat = globalZk.exists(defaultNamespacePath, false);
+ stat = configStoreZk.exists(defaultNamespacePath, false);
if (stat == null) {
policies = new Policies();
policies.bundles = getBundles(16);
@@ -192,7 +211,7 @@ public class PulsarClusterMetadataSetup {
try {
ZkUtils.createFullPathOptimistic(
- globalZk,
+ configStoreZk,
defaultNamespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
@@ -201,14 +220,14 @@ public class PulsarClusterMetadataSetup {
// Ignore
}
} else {
- byte[] content = globalZk.getData(defaultNamespacePath, false, null);
+ byte[] content = configStoreZk.getData(defaultNamespacePath, false, null);
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
// Only update z-node if the list of clusters should be modified
if (!policies.replication_clusters.contains(arguments.cluster)) {
policies.replication_clusters.add(arguments.cluster);
- globalZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+ configStoreZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
stat.getVersion());
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index e5415b4..641636a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -133,7 +133,7 @@ public class PulsarStandaloneStarter {
// Set ZK server's host to localhost
config.setZookeeperServers(zkServers + ":" + zkPort);
- config.setGlobalZookeeperServers(zkServers + ":" + zkPort);
+ config.setConfigurationStoreServers(zkServers + ":" + zkPort);
config.setRunningStandalone(true);
Runtime.getRuntime().addShutdownHook(new Thread() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3d14b8b..c74ecd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -421,7 +421,7 @@ public class PulsarService implements AutoCloseable {
state = State.Started;
acquireSLANamespace();
-
+
// start function worker service if necessary
this.startWorkerService();
@@ -491,7 +491,7 @@ public class PulsarService implements AutoCloseable {
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
- (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
+ (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
try {
this.globalZkCache.start();
@@ -889,7 +889,7 @@ public class PulsarService implements AutoCloseable {
InternalConfigurationData internalConf = new InternalConfigurationData(
this.getConfiguration().getZookeeperServers(),
- this.getConfiguration().getGlobalZookeeperServers(),
+ this.getConfiguration().getConfigurationStoreServers(),
new ClientConfiguration().getZkLedgersRootPath());
URI dlogURI;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 59027c2..cab707e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -56,7 +56,7 @@ import io.swagger.annotations.ApiResponses;
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private int serviceConfigZkVersion = -1;
-
+
@GET
@Path("/{cluster}")
@ApiOperation(value = "Get the list of active brokers (web service addresses) in the cluster.", response = String.class, responseContainer = "Set")
@@ -135,11 +135,11 @@ public class BrokersBase extends AdminResource {
public List<String> getDynamicConfigurationName() {
return BrokerService.getDynamicConfiguration();
}
-
+
/**
* if {@link ServiceConfiguration}-field is allowed to be modified dynamically, update configuration-map into zk, so
* all other brokers get the watch and can see the change and take appropriate action on the change.
- *
+ *
* @param configName
* : configuration key
* @param configValue
@@ -192,7 +192,7 @@ public class BrokersBase extends AdminResource {
ClientConfiguration conf = new ClientConfiguration();
return new InternalConfigurationData(
pulsar().getConfiguration().getZookeeperServers(),
- pulsar().getConfiguration().getGlobalZookeeperServers(),
+ pulsar().getConfiguration().getConfigurationStoreServers(),
conf.getZkLedgersRootPath());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
index f9373c0..9cce6db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarBrokerStarterTest.java
@@ -47,7 +47,7 @@ public class PulsarBrokerStarterTest {
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.example.com,z2.example.com,z3.example.com");
- printWriter.println("globalZookeeperServers=gz1.example.com,gz2.example.com,gz3.example.com/foo");
+ printWriter.println("configurationStoreServers=gz1.example.com,gz2.example.com,gz3.example.com/foo");
printWriter.println("brokerDeleteInactiveTopicsEnabled=false");
printWriter.println("statusFilePath=/tmp/status.html");
printWriter.println("managedLedgerDefaultEnsembleSize=1");
@@ -99,7 +99,7 @@ public class PulsarBrokerStarterTest {
Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue));
ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue;
Assert.assertEquals(serviceConfig.getZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com");
- Assert.assertEquals(serviceConfig.getGlobalZookeeperServers(),
+ Assert.assertEquals(serviceConfig.getConfigurationStoreServers(),
"gz1.example.com,gz2.example.com,gz3.example.com/foo");
Assert.assertFalse(serviceConfig.isBrokerDeleteInactiveTopicsEnabled());
Assert.assertEquals(serviceConfig.getStatusFilePath(), "/tmp/status.html");
@@ -217,7 +217,7 @@ public class PulsarBrokerStarterTest {
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.example.com,z2.example.com,z3.example.com");
- printWriter.println("globalZookeeperServers=");
+ printWriter.println("configurationStoreServers=");
printWriter.println("brokerDeleteInactiveTopicsEnabled=false");
printWriter.println("statusFilePath=/tmp/status.html");
printWriter.println("managedLedgerDefaultEnsembleSize=1");
@@ -248,7 +248,7 @@ public class PulsarBrokerStarterTest {
Assert.assertTrue(ServiceConfiguration.class.isInstance(returnValue));
ServiceConfiguration serviceConfig = (ServiceConfiguration) returnValue;
Assert.assertEquals(serviceConfig.getZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com");
- Assert.assertEquals(serviceConfig.getGlobalZookeeperServers(), "z1.example.com,z2.example.com,z3.example.com");
+ Assert.assertEquals(serviceConfig.getConfigurationStoreServers(), "z1.example.com,z2.example.com,z3.example.com");
Assert.assertFalse(serviceConfig.isBrokerDeleteInactiveTopicsEnabled());
Assert.assertEquals(serviceConfig.getStatusFilePath(), "/tmp/status.html");
Assert.assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index c14ecfc..1f0058c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -208,7 +208,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
void internalConfiguration() throws Exception {
InternalConfigurationData expectedData = new InternalConfigurationData(
pulsar.getConfiguration().getZookeeperServers(),
- pulsar.getConfiguration().getGlobalZookeeperServers(),
+ pulsar.getConfiguration().getConfigurationStoreServers(),
new ClientConfiguration().getZkLedgersRootPath());
assertEquals(brokers.getInternalConfigurationData(), expectedData);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 72b8b74..23592b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -100,7 +100,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setActiveConsumerFailoverDelayTimeMillis(0);
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
- this.conf.setGlobalZookeeperServers("localhost:3181");
+ this.conf.setConfigurationStoreServers("localhost:3181");
}
protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 7d411f5..2f599ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -119,7 +119,7 @@ public class ReplicatorTestBase {
config1.setWebServicePort(webServicePort1);
config1.setWebServicePortTls(webServicePortTls1);
config1.setZookeeperServers("127.0.0.1:" + zkPort1);
- config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -153,7 +153,7 @@ public class ReplicatorTestBase {
config2.setWebServicePort(webServicePort2);
config2.setWebServicePortTls(webServicePortTls2);
config2.setZookeeperServers("127.0.0.1:" + zkPort2);
- config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -187,7 +187,7 @@ public class ReplicatorTestBase {
config3.setWebServicePort(webServicePort3);
config3.setWebServicePortTls(webServicePortTls3);
config3.setZookeeperServers("127.0.0.1:" + zkPort3);
- config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
index 298acde..fc5001b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
@@ -118,7 +118,7 @@ public class V1_ReplicatorTestBase {
config1.setWebServicePort(webServicePort1);
config1.setWebServicePortTls(webServicePortTls1);
config1.setZookeeperServers("127.0.0.1:" + zkPort1);
- config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -152,7 +152,7 @@ public class V1_ReplicatorTestBase {
config2.setWebServicePort(webServicePort2);
config2.setWebServicePortTls(webServicePortTls2);
config2.setZookeeperServers("127.0.0.1:" + zkPort2);
- config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -186,7 +186,7 @@ public class V1_ReplicatorTestBase {
config3.setWebServicePort(webServicePort3);
config3.setWebServicePortTls(webServicePortTls3);
config3.setZookeeperServers("127.0.0.1:" + zkPort3);
- config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index e96dff4..20bb407 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -886,7 +886,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config1.setAdvertisedAddress("localhost");
config1.setWebServicePort(webServicePort1);
config1.setZookeeperServers("127.0.0.1:" + zkPort1);
- config1.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -912,7 +912,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config2.setWebServicePort(webServicePort2);
config2.setAdvertisedAddress("localhost");
config2.setZookeeperServers("127.0.0.1:" + zkPort2);
- config2.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -938,7 +938,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
config3.setWebServicePort(webServicePort3);
config3.setAdvertisedAddress("localhost");
config3.setZookeeperServers("127.0.0.1:" + zkPort3);
- config3.setGlobalZookeeperServers("127.0.0.1:" + globalZKPort + "/foo");
+ config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
@@ -1012,4 +1012,4 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
log.error("Stats executor error", e);
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index eab6428..c59bf47 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -22,18 +22,17 @@ import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import com.google.common.collect.Sets;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.websocket.WebSocketService;
@@ -50,8 +49,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-
public class ProxyAuthenticationTest extends ProducerConsumerBase {
private int port;
@@ -70,11 +67,9 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
config.setWebServicePort(port);
config.setClusterName("test");
config.setAuthenticationEnabled(true);
- config.setGlobalZookeeperServers("dummy-zk-servers");
- config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
-
// If this is not set, 500 error occurs.
- config.setGlobalZookeeperServers("dummy");
+ config.setConfigurationStoreServers("dummy");
+ config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) {
config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index f9ac9eb..b79862f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -59,7 +59,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
Set<String> superUser = Sets.newHashSet("");
config.setAuthorizationEnabled(true);
- config.setGlobalZookeeperServers("dummy-zk-servers");
+ config.setConfigurationStoreServers("dummy-zk-servers");
config.setSuperUserRoles(superUser);
config.setClusterName("c1");
config.setWebServicePort(TEST_PORT);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index c05c356..e084b04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -86,7 +86,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
config.setWebServicePort(port);
config.setClusterName("test");
- config.setGlobalZookeeperServers("dummy-zk-servers");
+ config.setConfigurationStoreServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
doReturn(mockZooKeeperClientFactory).when(service).getZooKeeperClientFactory();
proxyServer = new ProxyServer(config);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index 6d486c2..650e05e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -73,7 +73,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setClusterName("use");
- config.setGlobalZookeeperServers("dummy-zk-servers");
+ config.setConfigurationStoreServers("dummy-zk-servers");
config.setBrokerClientAuthenticationParameters("tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT_TLS).toString();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index fbe6e07..b13a6c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -72,11 +72,9 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
config.setWebServicePort(port);
config.setClusterName("use");
config.setAuthenticationEnabled(true);
- config.setGlobalZookeeperServers("dummy-zk-servers");
- config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
-
// If this is not set, 500 error occurs.
- config.setGlobalZookeeperServers("dummy");
+ config.setConfigurationStoreServers("dummy");
+ config.setSuperUserRoles(Sets.newHashSet("pulsar.super_user"));
if (methodName.equals("authenticatedSocketTest") || methodName.equals("statsTest")) {
config.setAuthenticationProviders(Sets.newHashSet("org.apache.pulsar.websocket.proxy.MockAuthenticationProvider"));
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 2918e45..8034185 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -19,7 +19,7 @@
applicationName="pulsar_broker"
zookeeperServers="localhost"
-globalZookeeperServers="localhost"
+configurationStoreServers="localhost"
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 44b730c..bb097d6 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -22,9 +22,12 @@
# Zookeeper quorum connection string
zookeeperServers=
-# Global Zookeeper quorum connection string
+# Deprecated. Global zookeeper quorum connection string
globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
+
brokerServicePort=9885
brokerServicePortTls=9886
diff --git a/pulsar-client-cpp/tests/authentication.conf b/pulsar-client-cpp/tests/authentication.conf
index f7a53d6..6db26da 100644
--- a/pulsar-client-cpp/tests/authentication.conf
+++ b/pulsar-client-cpp/tests/authentication.conf
@@ -22,9 +22,12 @@
# Zookeeper quorum connection string
zookeeperServers=
-# Global Zookeeper quorum connection string
+# Deprecated. Global Zookeeper quorum connection string
globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
+
brokerServicePort=9885
brokerServicePortTls=9886
diff --git a/pulsar-client-cpp/tests/standalone.conf b/pulsar-client-cpp/tests/standalone.conf
index 630de70..2fb8c0b 100644
--- a/pulsar-client-cpp/tests/standalone.conf
+++ b/pulsar-client-cpp/tests/standalone.conf
@@ -22,9 +22,12 @@
# Zookeeper quorum connection string
zookeeperServers=
-# Global Zookeeper quorum connection string
+# Deprecated. Global Zookeeper quorum connection string
globalZookeeperServers=
+# Configuration Store connection string
+configurationStoreServers=
+
brokerServicePort=8885
# Port to use to server HTTP request
@@ -60,13 +63,13 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60
-# How frequently to proactively check and purge expired messages
+# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5
# Enable check for minimum allowed client library version
clientLibraryVersionCheckEnabled=false
-# Allow client libraries with no version information
+# Allow client libraries with no version information
clientLibraryVersionCheckAllowUnversioned=true
# Path for the file used to determine the rotation status for the broker when responding
@@ -108,7 +111,7 @@ bookkeeperClientAuthenticationPlugin=
bookkeeperClientAuthenticationParametersName=
bookkeeperClientAuthenticationParameters=
-# Timeout for BK add / read operations
+# Timeout for BK add / read operations
bookkeeperClientTimeoutInSeconds=30
# Speculative reads are initiated if a read request doesn't complete within a certain time
@@ -124,11 +127,11 @@ bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
# Enable rack-aware bookie selection policy. BK will chose bookies from different racks when
-# forming a new bookie ensemble
+# forming a new bookie ensemble
bookkeeperClientRackawarePolicyEnabled=true
-# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
-# outside the specified groups will not be used by the broker
+# Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie
+# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=
### --- Managed Ledger --- ###
@@ -144,7 +147,7 @@ managedLedgerDefaultAckQuorum=1
# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
-# running in the same broker
+# running in the same broker
managedLedgerCacheSizeMB=1024
# Threshold to which bring down the cache level when eviction is triggered
@@ -156,7 +159,7 @@ managedLedgerDefaultMarkDeleteRateLimit=0.1
# Max number of entries to append to a ledger before triggering a rollover
# A ledger rollover is triggered on these conditions
# * Either the max rollover time has been reached
-# * or max entries have been written to the ledged and at least min-time
+# * or max entries have been written to the ledged and at least min-time
# has passed
managedLedgerMaxEntriesPerLedger=50000
@@ -174,7 +177,7 @@ managedLedgerCursorRolloverTimeInSeconds=14400
-### --- Load balancer --- ###
+### --- Load balancer --- ###
# Enable load balancer
loadBalancerEnabled=false
@@ -192,13 +195,13 @@ loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1
# Load shedding interval. Broker periodically checks whether some traffic should be offload from
-# some over-loaded broker to other under-loaded brokers
+# some over-loaded broker to other under-loaded brokers
loadBalancerSheddingIntervalMinutes=30
-# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
+# Prevent the same topics to be shed and moved to other broker more that once within this timeframe
loadBalancerSheddingGracePeriodMinutes=30
-# Usage threshold to determine a broker as under-loaded
+# Usage threshold to determine a broker as under-loaded
loadBalancerBrokerUnderloadedThresholdPercentage=1
# Usage threshold to determine a broker as over-loaded
@@ -238,7 +241,7 @@ replicationMetricsEnabled=true
# Max number of connections to open for each broker in a remote cluster
# More connections host-to-host lead to better throughput over high-latency
-# links.
+# links.
replicationConnectionsPerBroker=16
# Replicator producer queue size
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
index aa1068a..4a038a3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/conf/InternalConfigurationData.java
@@ -24,17 +24,17 @@ import java.util.Objects;
public class InternalConfigurationData {
private String zookeeperServers;
- private String globalZooKeeperServers;
+ private String configurationStoreServers;
private String ledgersRootPath;
public InternalConfigurationData() {
}
public InternalConfigurationData(String zookeeperServers,
- String globalZooKeeperServers,
+ String configurationStoreServers,
String ledgersRootPath) {
this.zookeeperServers = zookeeperServers;
- this.globalZooKeeperServers = globalZooKeeperServers;
+ this.configurationStoreServers = configurationStoreServers;
this.ledgersRootPath = ledgersRootPath;
}
@@ -42,8 +42,8 @@ public class InternalConfigurationData {
return zookeeperServers;
}
- public String getGlobalZooKeeperServers() {
- return globalZooKeeperServers;
+ public String getConfigurationStoreServers() {
+ return configurationStoreServers;
}
public String getLedgersRootPath() {
@@ -57,20 +57,20 @@ public class InternalConfigurationData {
}
InternalConfigurationData other = (InternalConfigurationData) obj;
return Objects.equals(zookeeperServers, other.zookeeperServers)
- && Objects.equals(globalZooKeeperServers, other.globalZooKeeperServers)
+ && Objects.equals(configurationStoreServers, other.configurationStoreServers)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath);
}
@Override
public int hashCode() {
- return Objects.hash(zookeeperServers, globalZooKeeperServers, ledgersRootPath);
+ return Objects.hash(zookeeperServers, configurationStoreServers, ledgersRootPath);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("zookeeperServers", zookeeperServers)
- .add("globalZooKeeperServers", globalZooKeeperServers)
+ .add("configurationStoreServers", configurationStoreServers)
.add("ledgersRootPath", ledgersRootPath)
.toString();
}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 60a72c1..54463aa 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -72,7 +72,7 @@ public class BrokerDiscoveryProvider implements Closeable {
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
- config.getGlobalZookeeperServers(), orderedExecutor, scheduledExecutorScheduler);
+ config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e);
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java
index f36ccee..35af859 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceStarter.java
@@ -40,6 +40,11 @@ import org.slf4j.LoggerFactory;
*/
public class DiscoveryServiceStarter {
+ public static void checkConfig(ServiceConfig config) {
+ checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
+ checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configuration-store Servers must be provided");
+ }
+
public static void init(String configFile) throws Exception {
// setup handlers
removeHandlersForRootLogger();
@@ -50,14 +55,13 @@ public class DiscoveryServiceStarter {
// load config file
final ServiceConfig config = PulsarConfigurationLoader.create(configFile, ServiceConfig.class);
- checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
- checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "global-zookeeperServers must be provided");
-
+ checkConfig(config);
+
// create Discovery service
DiscoveryService discoveryService = new DiscoveryService(config);
// create a web-service
final ServerManager server = new ServerManager(config);
-
+
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -69,7 +73,7 @@ public class DiscoveryServiceStarter {
}
}
});
-
+
discoveryService.start();
startWebService(server, config);
}
diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
index c1d59ee..c024c19 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/server/ServiceConfig.java
@@ -36,7 +36,10 @@ public class ServiceConfig implements PulsarConfiguration {
// Local-Zookeeper quorum connection string
private String zookeeperServers;
// Global-Zookeeper quorum connection string
+ @Deprecated
private String globalZookeeperServers;
+ // Configuration Store connection string
+ private String configurationStoreServers;
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;
@@ -91,7 +94,7 @@ public class ServiceConfig implements PulsarConfiguration {
// Specify whether Client certificates are required for TLS
// Reject the Connection if the Client Certificate is not trusted.
private boolean tlsRequireTrustedClientCertOnConnect = false;
-
+
private Properties properties = new Properties();
public String getZookeeperServers() {
@@ -102,14 +105,24 @@ public class ServiceConfig implements PulsarConfiguration {
this.zookeeperServers = zookeeperServers;
}
+ @Deprecated
public String getGlobalZookeeperServers() {
return globalZookeeperServers;
}
+ @Deprecated
public void setGlobalZookeeperServers(String globalZookeeperServers) {
this.globalZookeeperServers = globalZookeeperServers;
}
+ public String getConfigurationStoreServers() {
+ return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
+ }
+
+ public void setConfigurationStoreServers(String configurationStoreServers) {
+ this.configurationStoreServers = configurationStoreServers;
+ }
+
public int getZookeeperSessionTimeoutMs() {
return zookeeperSessionTimeoutMs;
}
@@ -253,7 +266,7 @@ public class ServiceConfig implements PulsarConfiguration {
public void setProperties(Properties properties) {
this.properties = properties;
}
-
+
public Set<String> getTlsProtocols() {
return tlsProtocols;
}
@@ -269,7 +282,7 @@ public class ServiceConfig implements PulsarConfiguration {
public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}
-
+
public boolean getTlsRequireTrustedClientCertOnConnect() {
return tlsRequireTrustedClientCertOnConnect;
}
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java
index 48b69a1..3902431 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/server/DiscoveryServiceWebTest.java
@@ -27,9 +27,8 @@ import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.discovery.service.server.DiscoveryServiceStarter;
-import org.apache.pulsar.discovery.service.server.ServerManager;
-import org.apache.pulsar.discovery.service.server.ServiceConfig;
+import org.apache.pulsar.discovery.service.DiscoveryService;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
/**
@@ -40,7 +39,7 @@ import org.testng.annotations.Test;
*/
public class DiscoveryServiceWebTest {
-
+
@Test
public void testWebDiscoveryServiceStarter() throws Exception {
@@ -51,7 +50,7 @@ public class DiscoveryServiceWebTest {
}
PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
- printWriter.println("globalZookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
printWriter.println("webServicePort=" + port);
printWriter.close();
testConfigFile.deleteOnExit();
@@ -63,4 +62,62 @@ public class DiscoveryServiceWebTest {
testConfigFile.delete();
}
+ /**
+ * Test Configuration BackwardCompat for the change from globalzookeeper to configurationStore
+ */
+ @Test
+ public void testConfigurationBackwardCompat() throws Exception {
+ DiscoveryService service = Mockito.mock(DiscoveryService.class);
+
+ int port = nextFreePort();
+ File testConfigFile = new File("tmp." + System.currentTimeMillis() + ".properties");
+ if (testConfigFile.exists()) {
+ testConfigFile.delete();
+ }
+ PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
+ printWriter.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter.println("globalZookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter.println("webServicePort=" + port);
+ printWriter.close();
+ testConfigFile.deleteOnExit();
+
+ ServiceConfig config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class);
+ // have zookeeperServers and globalZookeeperServers, config is valid
+ // should not throw IllegalArgumentException.
+ DiscoveryServiceStarter.checkConfig(config);
+
+
+ if (testConfigFile.exists()) {
+ testConfigFile.delete();
+ }
+ PrintWriter printWriter2 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
+ printWriter2.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter2.println("configurationStoreServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter2.println("webServicePort=" + port);
+ printWriter2.close();
+ config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class);
+ // have zookeeperServers and configurationStoreServers, config is valid
+ // should not throw IllegalArgumentException.
+ DiscoveryServiceStarter.checkConfig(config);
+
+
+ if (testConfigFile.exists()) {
+ testConfigFile.delete();
+ }
+ PrintWriter printWriter3 = new PrintWriter(new OutputStreamWriter(new FileOutputStream(testConfigFile)));
+ printWriter3.println("zookeeperServers=z1.pulsar.com,z2.pulsar.com,z3.pulsar.com");
+ printWriter3.println("webServicePort=" + port);
+ printWriter3.close();
+ config = PulsarConfigurationLoader.create(testConfigFile.getAbsolutePath(), ServiceConfig.class);
+ // only have zookeeperServers
+ // should throw IllegalArgumentException.
+ try {
+ DiscoveryServiceStarter.checkConfig(config);
+ } catch (IllegalArgumentException e) {
+ // expected: configure error
+ }
+
+ testConfigFile.delete();
+ }
+
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index d038381..56e6cc5 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -71,7 +71,7 @@ public class BrokerDiscoveryProvider implements Closeable {
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
- config.getGlobalZookeeperServers(), orderedExecutor, scheduledExecutorScheduler);
+ config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
LOG.error("Failed to start Zookkeeper {}", e.getMessage(), e);
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index 43b8d56..dcd9580 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -30,9 +30,13 @@ public class ProxyConfiguration implements PulsarConfiguration {
// Local-Zookeeper quorum connection string
private String zookeeperServers;
+ @Deprecated
// Global-Zookeeper quorum connection string
private String globalZookeeperServers;
+ // Configuration Store connection string
+ private String configurationStoreServers;
+
// ZooKeeper session timeout
private int zookeeperSessionTimeoutMs = 30_000;
@@ -49,7 +53,7 @@ public class ProxyConfiguration implements PulsarConfiguration {
private int webServicePort = 8080;
// Port to use to server HTTPS request
private int webServicePortTls = 8443;
-
+
// Path for the file used to determine the rotation status for the broker
// when responding to service discovery health checks
private String statusFilePath;
@@ -142,14 +146,24 @@ public class ProxyConfiguration implements PulsarConfiguration {
this.zookeeperServers = zookeeperServers;
}
+ @Deprecated
public String getGlobalZookeeperServers() {
return globalZookeeperServers;
}
+ @Deprecated
public void setGlobalZookeeperServers(String globalZookeeperServers) {
this.globalZookeeperServers = globalZookeeperServers;
}
+ public String getConfigurationStoreServers() {
+ return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
+ }
+
+ public void setConfigurationStoreServers(String configurationStoreServers) {
+ this.configurationStoreServers = configurationStoreServers;
+ }
+
public int getZookeeperSessionTimeoutMs() {
return zookeeperSessionTimeoutMs;
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index e798077..48293be 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -119,7 +119,7 @@ public class ProxyService implements Closeable {
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(proxyConfig);
authenticationService = new AuthenticationService(serviceConfiguration);
- if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getGlobalZookeeperServers())) {
+ if (!isBlank(proxyConfig.getZookeeperServers()) && !isBlank(proxyConfig.getConfigurationStoreServers())) {
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
authorizationService = new AuthorizationService(serviceConfiguration, configurationCacheService);
@@ -214,4 +214,4 @@ public class ProxyService implements Closeable {
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
-}
\ No newline at end of file
+}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 0738893..77fcd77 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -53,9 +53,14 @@ public class ProxyServiceStarter {
@Parameter(names = { "-zk", "--zookeeper-servers" }, description = "Local zookeeper connection string")
private String zookeeperServers = "";
+ @Deprecated
@Parameter(names = { "-gzk", "--global-zookeeper-servers" }, description = "Global zookeeper connection string")
private String globalZookeeperServers = "";
+ @Parameter(names = { "-cs", "--configuration-store-servers" },
+ description = "Configuration store connection string")
+ private String configurationStoreServers = "";
+
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
@@ -90,13 +95,17 @@ public class ProxyServiceStarter {
if (!isBlank(globalZookeeperServers)) {
// Use globalZookeeperServers from command line
- config.setGlobalZookeeperServers(globalZookeeperServers);
+ config.setConfigurationStoreServers(globalZookeeperServers);
+ }
+ if (!isBlank(configurationStoreServers)) {
+ // Use configurationStoreServers from command line
+ config.setConfigurationStoreServers(configurationStoreServers);
}
if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
|| config.isAuthorizationEnabled()) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
- checkArgument(!isEmpty(config.getGlobalZookeeperServers()), "globalZookeeperServers must be provided");
+ checkArgument(!isEmpty(config.getConfigurationStoreServers()), "configurationStoreServers must be provided");
}
java.security.Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider());
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
index ca72616..fba6010 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticatedProducerConsumerTest.java
@@ -117,7 +117,7 @@ public class ProxyAuthenticatedProducerConsumerTest extends ProducerConsumerBase
proxyConfig.setAuthenticationProviders(providers);
proxyConfig.setZookeeperServers(DUMMY_VALUE);
- proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 1658e21..91e3523 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -47,7 +47,7 @@ public class ProxyConnectionThrottlingTest extends MockedPulsarServiceBaseTest {
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setZookeeperServers(DUMMY_VALUE);
- proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index 4411f80..07eb137 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -46,7 +46,7 @@ public class ProxyLookupThrottlingTest extends MockedPulsarServiceBaseTest {
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setZookeeperServers(DUMMY_VALUE);
- proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyConfig.setMaxConcurrentLookupRequests(NUM_CONCURRENT_LOOKUP);
proxyConfig.setMaxConcurrentInboundConnections(NUM_CONCURRENT_INBOUND_CONNECTION);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 5c994b0..8647994 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -51,7 +51,7 @@ public class ProxyTest extends MockedPulsarServiceBaseTest {
proxyConfig.setServicePort(PortManager.nextFreePort());
proxyConfig.setZookeeperServers(DUMMY_VALUE);
- proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
index 5243b9e..a2332a8 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTlsTest.java
@@ -60,7 +60,7 @@ public class ProxyTlsTest extends MockedPulsarServiceBaseTest {
proxyConfig.setTlsCertificateFilePath(TLS_PROXY_CERT_FILE_PATH);
proxyConfig.setTlsKeyFilePath(TLS_PROXY_KEY_FILE_PATH);
proxyConfig.setZookeeperServers(DUMMY_VALUE);
- proxyConfig.setGlobalZookeeperServers(DUMMY_VALUE);
+ proxyConfig.setConfigurationStoreServers(DUMMY_VALUE);
proxyService = Mockito.spy(new ProxyService(proxyConfig));
doReturn(mockZooKeeperClientFactory).when(proxyService).getZooKeeperClientFactory();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index b917dc2..6660cd8 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -99,9 +99,9 @@ public class WebSocketService implements Closeable {
public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
DeploymentException {
- if (isNotBlank(config.getGlobalZookeeperServers())) {
+ if (isNotBlank(config.getConfigurationStoreServers())) {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
- (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
+ (int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
@@ -116,7 +116,7 @@ public class WebSocketService implements Closeable {
if (config.isAuthorizationEnabled()) {
if (configurationCacheService == null) {
throw new PulsarServerException(
- "Failed to initialize authorization manager due to empty GlobalZookeeperServers");
+ "Failed to initialize authorization manager due to empty ConfigurationStoreServers");
}
authorizationService = new AuthorizationService(this.config, configurationCacheService);
}
@@ -218,7 +218,8 @@ public class WebSocketService implements Closeable {
private ClusterData retrieveClusterData() throws PulsarServerException {
if (configurationCacheService == null) {
- throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers");
+ throw new PulsarServerException(
+ "Failed to retrieve Cluster data due to empty ConfigurationStoreServers");
}
try {
String path = "/admin/clusters/" + config.getClusterName();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
index 0126d49..b790723 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java
@@ -40,18 +40,20 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
@FieldContext(required = true)
private String clusterName;
- // Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
+ // Pulsar cluster url to connect to broker (optional if configurationStoreServers present)
private String serviceUrl;
private String serviceUrlTls;
private String brokerServiceUrl;
private String brokerServiceUrlTls;
-
+
// Path for the file used to determine the rotation status for the broker
// when responding to service discovery health checks
private String statusFilePath;
- // Global Zookeeper quorum connection string
+ // Configuration Store connection string
+ @Deprecated
private String globalZookeeperServers;
+ private String configurationStoreServers;
// Zookeeper session timeout in milliseconds
private long zooKeeperSessionTimeoutMillis = 30000;
@@ -159,14 +161,24 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
this.statusFilePath = statusFilePath;
}
+ @Deprecated
public String getGlobalZookeeperServers() {
return globalZookeeperServers;
}
+ @Deprecated
public void setGlobalZookeeperServers(String globalZookeeperServers) {
this.globalZookeeperServers = globalZookeeperServers;
}
+ public String getConfigurationStoreServers() {
+ return null == configurationStoreServers ? getGlobalZookeeperServers() : configurationStoreServers;
+ }
+
+ public void setConfigurationStoreServers(String configurationStoreServers) {
+ this.configurationStoreServers = configurationStoreServers;
+ }
+
public long getZooKeeperSessionTimeoutMillis() {
return zooKeeperSessionTimeoutMillis;
}
diff --git a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java
similarity index 86%
rename from pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java
rename to pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java
index d70885b..6f488bb 100644
--- a/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperStarter.java
+++ b/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ConfigurationStoreStarter.java
@@ -21,10 +21,10 @@ package org.apache.pulsar.zookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GlobalZooKeeperStarter extends ZooKeeperStarter {
+public class ConfigurationStoreStarter extends ZooKeeperStarter {
public static void main(String[] args) throws Exception {
start(args, "8001");
}
- private static final Logger log = LoggerFactory.getLogger(GlobalZooKeeperStarter.class);
+ private static final Logger log = LoggerFactory.getLogger(ConfigurationStoreStarter.class);
}
diff --git a/site/docs/latest/deployment/cluster.md b/site/docs/latest/deployment/cluster.md
index 0a23b0e..773cfa2 100644
--- a/site/docs/latest/deployment/cluster.md
+++ b/site/docs/latest/deployment/cluster.md
@@ -142,7 +142,7 @@ You can initialize this metadata using the [`initialize-cluster-metadata`](../..
$ bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-1 \
--zookeeper zk1.us-west.example.com:2181 \
- --global-zookeeper zk1.us-west.example.com:2181 \
+ --configuration-store zk1.us-west.example.com:2181 \
--web-service-url http://pulsar.us-west.example.com:8080 \
--web-service-url-tls https://pulsar.us-west.example.com:8443 \
--broker-service-url pulsar://pulsar.us-west.example.com:6650 \
@@ -155,7 +155,7 @@ Flag | Description
:----|:-----------
`--cluster` | A name for the cluster
`--zookeeper` | A "local" ZooKeeper connection string for the cluster. This connection string only needs to include *one* machine in the ZooKeeper cluster.
-`--global-zookeeper` | The "global" ZooKeeper connection string for the entire instance. As with the `--zookeeper` flag, this connection string only needs to include *one* machine in the ZooKeeper cluster.
+`--configuration-store` | The configuration store (ZooKeeper) where the configuration policies for all tenants and namespaces across all clusters will be stored. As with the `--zookeeper` flag, this connection string only needs to include *one* machine in the ZooKeeper cluster.
`--web-service-url` | The web service URL for the cluster, plus a port. This URL should be a standard DNS name. The default port is 8080 (we don't recommend using a different port).
`--web-service-url-tls` | If you're using [TLS](../../../admin/Authz#tls-client-auth), you'll also need to specify a TLS web service URL for the cluster. The default port is 8443 (we don't recommend using a different port).
`--broker-service-url` | A broker service URL enabling interaction with the {% popover brokers %} in the cluster. This URL should use the same DNS name as the web service URL but should use the `pulsar` scheme instead. The default port is 6650 (we don't recommend using a different port).
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf b/tests/docker-images/latest-version-image/conf/bookie.conf
index 53fd06e..030d6ad 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,5 +22,5 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
command=/pulsar/bin/pulsar bookie
-
diff --git a/tests/docker-images/latest-version-image/conf/broker.conf b/tests/docker-images/latest-version-image/conf/broker.conf
index ca8843f..5492abf 100644
--- a/tests/docker-images/latest-version-image/conf/broker.conf
+++ b/tests/docker-images/latest-version-image/conf/broker.conf
@@ -22,5 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/broker.log
directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
command=/pulsar/bin/pulsar broker
diff --git a/tests/docker-images/latest-version-image/conf/global-zk.conf b/tests/docker-images/latest-version-image/conf/global-zk.conf
index f589ade..5c6edaa 100644
--- a/tests/docker-images/latest-version-image/conf/global-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/global-zk.conf
@@ -22,5 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/global-zk.log
directory=/pulsar
-command=/pulsar/bin/pulsar global-zookeeper
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar configuration-store
diff --git a/tests/docker-images/latest-version-image/conf/local-zk.conf b/tests/docker-images/latest-version-image/conf/local-zk.conf
index f5daba0..2822cb1 100644
--- a/tests/docker-images/latest-version-image/conf/local-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/local-zk.conf
@@ -22,5 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/local-zk.log
directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
command=/pulsar/bin/pulsar zookeeper
diff --git a/tests/docker-images/latest-version-image/conf/proxy.conf b/tests/docker-images/latest-version-image/conf/proxy.conf
index 7ab7e8f..359e657 100644
--- a/tests/docker-images/latest-version-image/conf/proxy.conf
+++ b/tests/docker-images/latest-version-image/conf/proxy.conf
@@ -21,6 +21,8 @@
autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/proxy.log
+stderr_logfile=/var/log/pulsar/proxy-stderr.log
directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
command=/pulsar/bin/pulsar proxy
diff --git a/tests/docker-images/latest-version-image/scripts/init-cluster.sh b/tests/docker-images/latest-version-image/scripts/init-cluster.sh
index e4403b3..d70e257 100755
--- a/tests/docker-images/latest-version-image/scripts/init-cluster.sh
+++ b/tests/docker-images/latest-version-image/scripts/init-cluster.sh
@@ -27,7 +27,7 @@ if [ $? != 0 ]; then
echo Initializing cluster
bin/apply-config-from-env.py conf/bookkeeper.conf &&
bin/pulsar initialize-cluster-metadata --cluster $cluster --zookeeper $zkServers \
- --global-zookeeper $globalZkServers --web-service-url http://$pulsarNode:8080/ \
+ --configuration-store $configurationStore --web-service-url http://$pulsarNode:8080/ \
--broker-service-url http://$pulsarNode:6650/ &&
bin/watch-znode.py -z $zkServers -p $ZNODE -c
echo Initialized
diff --git a/tests/integration-tests-base/pom.xml b/tests/integration-tests-base/pom.xml
index 6017744..37b8f70 100644
--- a/tests/integration-tests-base/pom.xml
+++ b/tests/integration-tests-base/pom.xml
@@ -31,7 +31,6 @@
<version>2.0.0-incubating-SNAPSHOT</version>
</parent>
- <groupId>org.apache.pulsar.tests</groupId>
<artifactId>integration-tests-base</artifactId>
<packaging>pom</packaging>
diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml
index a77d34d..d2c34cc 100644
--- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml
+++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml
@@ -37,17 +37,17 @@ zookeeper*:
strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
networkMode: pulsarnet*
-global-zookeeper*:
+configuration-store*:
image: apachepulsar/pulsar-test-latest-version:latest
await:
strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env: [ZOOKEEPER_SERVERS=zookeeper]
+ env: [ZOOKEEPER_SERVERS=configuration-store]
labels:
cluster: test
- service: global-zookeeper
+ service: configuration-store
entryPoint: [bin/run-global-zk.sh]
aliases:
- - global-zookeeper
+ - configuration-store
beforeStop:
- customBeforeStopAction:
strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
@@ -60,7 +60,7 @@ init*:
env:
- cluster=test
- zkServers=zookeeper
- - globalZkServers=global-zookeeper:2184
+ - configurationStore=configuration-store:2184
- pulsarNode=pulsar-broker1
labels:
cluster: test
@@ -127,7 +127,7 @@ pulsar-broker1*:
- pulsar-broker1
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
- NO_AUTOSTART=true
labels:
@@ -147,7 +147,7 @@ pulsar-broker2*:
- pulsar-broker2
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
- NO_AUTOSTART=true
labels:
@@ -167,7 +167,7 @@ pulsar-proxy*:
- pulsar-broker2
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
- NO_AUTOSTART=true
labels:
diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
index dd12abd..3695f2a 100644
--- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
+++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
@@ -37,17 +37,17 @@ zookeeper*:
strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
networkMode: pulsarnet*
-global-zookeeper*:
+configuration-store*:
image: apachepulsar/pulsar-test-latest-version:latest
await:
strategy: org.apache.pulsar.tests.NoopAwaitStrategy
- env: [ZOOKEEPER_SERVERS=zookeeper]
+ env: [ZOOKEEPER_SERVERS=configuration-store]
labels:
cluster: test
- service: global-zookeeper
+ service: configuration-store
entryPoint: [bin/run-global-zk.sh]
aliases:
- - global-zookeeper
+ - configuration-store
beforeStop:
- customBeforeStopAction:
strategy: org.apache.pulsar.tests.PulsarLogsToTargetDirStopAction
@@ -60,7 +60,7 @@ init*:
env:
- cluster=test
- zkServers=zookeeper
- - globalZkServers=global-zookeeper:2184
+ - configurationStore=configuration-store:2184
- pulsarNode=pulsar-broker1
labels:
cluster: test
@@ -127,7 +127,7 @@ pulsar-broker1*:
- pulsar-broker1
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
labels:
cluster: test
@@ -146,7 +146,7 @@ pulsar-broker2*:
- pulsar-broker2
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
labels:
cluster: test
@@ -165,7 +165,7 @@ pulsar-proxy*:
- pulsar-broker2
env:
- zookeeperServers=zookeeper
- - globalZookeeperServers=global-zookeeper:2184
+ - configurationStoreServers=configuration-store:2184
- clusterName=test
labels:
cluster: test
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
index fb69dc7..b815b51 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java
@@ -183,6 +183,7 @@ public class DockerUtils {
}
int retCode = resp.getExitCode();
if (retCode != 0) {
+ LOG.error("DOCKER.exec({}:{}): failed with {} : {}", containerId, cmdString, retCode, output);
throw new RuntimeException(
String.format("cmd(%s) failed on %s with exitcode %d",
cmdString, containerId, retCode));
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
index ff82b8d..984dadc 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java
@@ -41,6 +41,9 @@ public class PulsarClusterUtils {
private static final Logger LOG = LoggerFactory.getLogger(PulsarClusterUtils.class);
static final short BROKER_PORT = 8080;
+ public static final String PULSAR_ADMIN = "/pulsar/bin/pulsar-admin";
+ public static final String PULSAR = "/pulsar/bin/pulsar";
+
public static String zookeeperConnectString(DockerClient docker, String cluster) {
return DockerUtils.cubeIdsWithLabels(docker, ImmutableMap.of("service", "zookeeper", "cluster", cluster))
.stream().map((id) -> DockerUtils.getContainerIP(docker, id)).collect(Collectors.joining(":"));
@@ -166,6 +169,7 @@ public class PulsarClusterUtils {
return true;
} catch (Exception e) {
// couldn't connect, try again after sleep
+ LOG.info("Failed to connect {} @ {}", ip, BROKER_PORT, e);
}
try {
Thread.sleep(pollMillis);
diff --git a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index bd84913..5570f05 100644
--- a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -51,14 +51,14 @@ public class TestCompaction extends Arquillian {
@Test
public void testPublishCompactAndConsumeCLI() throws Exception {
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "tenants",
+ PulsarClusterUtils.PULSAR_ADMIN, "tenants",
"create", "compaction-test-cli", "--allowed-clusters", clusterName,
"--admin-roles", "admin");
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "namespaces",
+ PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
"create", "compaction-test-cli/ns1");
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "namespaces",
+ PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
"set-clusters", "--clusters", "test", "compaction-test-cli/ns1");
String brokerIp = DockerUtils.getContainerIP(
@@ -86,7 +86,7 @@ public class TestCompaction extends Arquillian {
}
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar", "compact-topic",
+ PulsarClusterUtils.PULSAR, "compact-topic",
"-t", topic);
try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
@@ -101,14 +101,14 @@ public class TestCompaction extends Arquillian {
@Test
public void testPublishCompactAndConsumeRest() throws Exception {
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "tenants",
+ PulsarClusterUtils.PULSAR_ADMIN, "tenants",
"create", "compaction-test-rest", "--allowed-clusters", clusterName,
"--admin-roles", "admin");
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "namespaces",
+ PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
"create", "compaction-test-rest/ns1");
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "namespaces",
+ PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
"set-clusters", "--clusters", "test", "compaction-test-rest/ns1");
String brokerIp = DockerUtils.getContainerIP(
@@ -135,7 +135,7 @@ public class TestCompaction extends Arquillian {
Assert.assertEquals(m.getData(), "content1".getBytes());
}
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
- "/pulsar/bin/pulsar-admin", "persistent", "compact", topic);
+ PulsarClusterUtils.PULSAR_ADMIN, "persistent", "compact", topic);
PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
"/pulsar/bin/pulsar-admin", "persistent", "compaction-status",
diff --git a/tests/integration/smoke/src/test/resources/arquillian.xml b/tests/integration/smoke/src/test/resources/arquillian.xml
index ced9de1..73a5908 100644
--- a/tests/integration/smoke/src/test/resources/arquillian.xml
+++ b/tests/integration/smoke/src/test/resources/arquillian.xml
@@ -25,6 +25,7 @@
http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
<extension qualifier="docker">
+ <property name="serverUri">unix:///var/run/docker.sock</property>
<property name="definitionFormat">CUBE</property>
<property name="dockerContainersResource">cube-definitions/single-cluster-3-bookie-2-broker-unstarted.yaml</property>
</extension>
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.