You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/07/19 06:36:44 UTC
[pulsar] branch master updated: [improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3d4fa00b005 [improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)
3d4fa00b005 is described below
commit 3d4fa00b0059744c3d945c77672b9769c335014c
Author: tison <wa...@gmail.com>
AuthorDate: Tue Jul 19 14:36:36 2022 +0800
[improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)
### Motivation
Cleanup code when reading it.
### Modifications
Encapsulate usage of deprecated configs:
- subscriptionKeySharedEnable
- zookeeperServers
---
.../apache/pulsar/broker/ServiceConfiguration.java | 36 +++++++++++---------
.../configuration/PulsarConfigurationLoader.java | 2 +-
.../java/org/apache/pulsar/PulsarStandalone.java | 3 +-
.../apache/pulsar/broker/SLAMonitoringTest.java | 2 +-
.../AntiAffinityNamespaceGroupTest.java | 4 +--
.../loadbalance/LeaderElectionServiceTest.java | 2 +-
.../broker/loadbalance/LoadBalancerTest.java | 2 +-
.../loadbalance/ModularLoadManagerImplTest.java | 6 ++--
.../broker/loadbalance/SimpleBrokerStartTest.java | 4 +--
.../loadbalance/SimpleLoadManagerImplTest.java | 4 +--
.../loadbalance/impl/BundleSplitterTaskTest.java | 2 +-
.../broker/service/AdvertisedAddressTest.java | 2 +-
.../broker/service/BacklogQuotaManagerTest.java | 2 +-
.../pulsar/broker/service/BkEnsemblesTestBase.java | 2 +-
.../broker/service/BrokerBookieIsolationTest.java | 10 +++---
.../pulsar/broker/service/TopicOwnerTest.java | 2 +-
.../coordinator/TransactionMetaStoreTestBase.java | 2 +-
.../client/api/ClientDeduplicationFailureTest.java | 2 +-
.../client/api/KeySharedSubscriptionTest.java | 38 +++++++---------------
.../common/naming/ServiceConfigurationTest.java | 13 ++++++--
.../worker/PulsarFunctionE2ESecurityTest.java | 2 +-
.../worker/PulsarFunctionLocalRunTest.java | 2 +-
.../worker/PulsarFunctionPublishTest.java | 2 +-
.../functions/worker/PulsarFunctionTlsTest.java | 2 +-
.../worker/PulsarWorkerAssignmentTest.java | 2 +-
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 2 +-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 +-
28 files changed, 78 insertions(+), 78 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c90c04e8eda..0e7eca59828 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -506,7 +506,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
@Deprecated
@FieldContext(
category = CATEGORY_POLICIES,
- doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
+ doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead."
)
private double backlogQuotaDefaultLimitGB = -1;
@@ -646,7 +646,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
- doc = "Enable Key_Shared subscription (default is enabled)"
+ doc = "Enable Key_Shared subscription (default is enabled).\n"
+ + "@deprecated - use subscriptionTypesEnabled instead."
)
private boolean subscriptionKeySharedEnable = true;
@@ -1903,7 +1904,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
deprecated = true,
doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n"
+ "If number of unack message range is higher than this limit then broker will persist"
- + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.")
+ + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.\n"
+ + "@deprecated - use managedLedgerMaxUnackedRangesToPersistInMetadataStore.")
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = -1;
@FieldContext(
category = CATEGORY_STORAGE_ML,
@@ -2312,7 +2314,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_POLICIES,
deprecated = true,
doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one "
- + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`"
+ + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`\n"
+ + "@deprecated - unused."
)
private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
@FieldContext(
@@ -2864,12 +2867,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private Set<String> additionalServlets = new TreeSet<>();
+ /**
+ * @deprecated Use {@link #getSubscriptionTypesEnabled()} instead
+ */
+ @Deprecated
+ public boolean isSubscriptionKeySharedEnable() {
+ return subscriptionKeySharedEnable && subscriptionTypesEnabled.contains("Key_Shared");
+ }
+
public String getMetadataStoreUrl() {
if (StringUtils.isNotBlank(metadataStoreUrl)) {
return metadataStoreUrl;
- } else {
+ } else if (StringUtils.isNotBlank(zookeeperServers)) {
// Fallback to old setting
- return zookeeperServers;
+ return ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers;
+ } else {
+ return "";
}
}
@@ -2907,15 +2920,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
} else {
// Fallback to same metadata service used by broker, adding the "metadata-store" to specify the BK
// metadata adapter
- String suffix;
- if (StringUtils.isNotBlank(metadataStoreUrl)) {
- suffix = metadataStoreUrl;
- } else {
- // Fallback to old setting
- // Note: chroot is not settable by using 'zookeeperServers' config.
- suffix = ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers;
- }
- return "metadata-store:" + suffix + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
+ // Note: chroot is not settable by using 'zookeeperServers' config.
+ return "metadata-store:" + getMetadataStoreUrl() + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
}
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 117681c87fe..12bba1a7fa2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -91,7 +91,7 @@ public class PulsarConfigurationLoader {
public static <T extends PulsarConfiguration> T create(Properties properties,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
requireNonNull(properties);
- T configuration = null;
+ T configuration;
try {
configuration = (T) clazz.getDeclaredConstructor().newInstance();
configuration.setProperties(properties);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 09acd1edc0e..b8c175c0a4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -458,8 +458,7 @@ public class PulsarStandalone implements AutoCloseable {
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
-
- config.setZookeeperServers("127.0.0.1:" + zkPort);
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
}
private static void processTerminator(int exitCode) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index af9aeb7eabf..749eeb7da3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -84,7 +84,7 @@ public class SLAMonitoringTest {
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 7595499e473..bafb523fb8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -109,7 +109,7 @@ public class AntiAffinityNamespaceGroupTest {
config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
- config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
@@ -131,7 +131,7 @@ public class AntiAffinityNamespaceGroupTest {
config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
- config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 925879d13b7..502cc56818a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -74,7 +74,7 @@ public class LeaderElectionServiceTest {
config.setWebServicePort(Optional.of(0));
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
@Cleanup
PulsarService pulsar = spyWithClassAndConstructorArgs(MockPulsarService.class, config);
pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 2e97cc43f43..a241f754d74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -123,7 +123,7 @@ public class LoadBalancerTest {
config.setWebServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 3cf7db8e674..12f95a2b796 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -147,7 +147,7 @@ public class ModularLoadManagerImplTest {
config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
- config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setAdvertisedAddress("localhost");
config1.setBrokerShutdownTimeoutMs(0L);
@@ -168,7 +168,7 @@ public class ModularLoadManagerImplTest {
config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
- config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setAdvertisedAddress("localhost");
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -591,7 +591,7 @@ public class ModularLoadManagerImplTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index a4e742b2bb6..c01aaec666b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -47,7 +47,7 @@ public class SimpleBrokerStartTest {
ServiceConfiguration config = spy(ServiceConfiguration.class);
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
@@ -75,7 +75,7 @@ public class SimpleBrokerStartTest {
ServiceConfiguration config = spy(ServiceConfiguration.class);
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6b8711addd4..ba8035d9307 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -109,7 +109,7 @@ public class SimpleLoadManagerImplTest {
ServiceConfiguration config1 = spy(ServiceConfiguration.class);
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
- config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
@@ -129,7 +129,7 @@ public class SimpleLoadManagerImplTest {
ServiceConfiguration config2 = new ServiceConfiguration();
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
- config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 21e4cfc666d..d4c3fced4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -60,7 +60,7 @@ public class BundleSplitterTaskTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 21dd37097b4..0385574b109 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -49,7 +49,7 @@ public class AdvertisedAddressTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setWebServicePort(Optional.ofNullable(0));
config.setClusterName("usc");
config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 57d5f0dfd50..a71eeea087d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -94,7 +94,7 @@ public class BacklogQuotaManagerTest {
// start pulsar service
config = new ServiceConfiguration();
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index b1db3033a8d..66ea7d9522c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -76,7 +76,7 @@ public abstract class BkEnsemblesTestBase extends TestRetrySupport {
if (config == null) {
config = new ServiceConfiguration();
}
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a05248ac808..e575154a743 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -148,7 +148,7 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
@@ -301,7 +301,7 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
@@ -442,7 +442,7 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
@@ -602,7 +602,7 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
@@ -743,7 +743,7 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index e3f106893b9..d8dadaf8b5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -97,7 +97,7 @@ public class TopicOwnerTest {
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 2f462d72e48..e69d6ba630b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -71,7 +71,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport {
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
config.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 7d9c1c77b74..0d8e87d5151 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -84,7 +84,7 @@ public class ClientDeduplicationFailureTest {
config = spy(ServiceConfiguration.class);
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4b899f2ee0c..c5d716de011 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -112,6 +112,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ super.resetConfig();
super.internalSetup();
super.producerBaseSetup();
this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
@@ -129,7 +130,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "data")
public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType, boolean enableBatch)
throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();
@Cleanup
@@ -155,9 +155,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
@Test(dataProvider = "data")
- public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch)
- throws Exception {
- this.conf.setSubscriptionKeySharedEnable(true);
+ public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch) throws Exception {
String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();
@Cleanup
@@ -203,7 +201,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "batch")
public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_exclusive-" + UUID.randomUUID();
@Cleanup
@@ -253,10 +250,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
@Test(dataProvider = "data")
- public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType,
- boolean enableBatch) throws PulsarClientException, InterruptedException {
-
- this.conf.setSubscriptionKeySharedEnable(true);
+ public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+ String topicType,
+ boolean enableBatch
+ ) throws PulsarClientException, InterruptedException {
String topic = topicType + "://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
@Cleanup
@@ -297,10 +294,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
@Test(dataProvider = "data")
- public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType,
- boolean enableBatch)
- throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
+ public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+ String topicType,
+ boolean enableBatch
+ ) throws PulsarClientException {
String topic = topicType + "://public/default/key_shared_none_key-" + UUID.randomUUID();
@Cleanup
@@ -327,7 +324,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "batch")
public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key_exclusive-" + UUID.randomUUID();
@Cleanup
@@ -366,7 +362,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "batch")
public void testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean enableBatch)
throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();
@Cleanup
@@ -395,7 +390,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "batch")
public void testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_exclusive_ordering_key-" + UUID.randomUUID();
@Cleanup
@@ -446,7 +440,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
public void testDisableKeySharedSubscription() throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(false);
+ this.conf.getSubscriptionTypesEnabled().remove("Key_Shared");
String topic = "persistent://public/default/key_shared_disabled";
pulsarClient.newConsumer()
.topic(topic)
@@ -458,7 +452,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testCannotUseAcknowledgeCumulative() throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ack_cumulative-" + UUID.randomUUID();
@Cleanup
@@ -484,7 +477,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test(dataProvider = "batch")
public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exception {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testMakingProgressWithSlowerConsumer-" + UUID.randomUUID();
String slowKey = "slowKey";
@@ -553,7 +545,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testOrderingWhenAddingConsumers() throws Exception {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testOrderingWhenAddingConsumers-" + UUID.randomUUID();
@Cleanup
@@ -595,7 +586,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testReadAheadWhenAddingConsumers() throws Exception {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
@Cleanup
@@ -649,7 +639,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testRemoveFirstConsumer() throws Exception {
- this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
@Cleanup
@@ -707,7 +696,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@Test
public void testHashRangeConflict() throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
final String sub = "test";
@@ -796,9 +784,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
@Test
- public void testAttachKeyToMessageMetadata()
- throws PulsarClientException {
- this.conf.setSubscriptionKeySharedEnable(true);
+ public void testAttachKeyToMessageMetadata() throws PulsarClientException {
String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
@Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 95e4ecc577b..1c7ce107468 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -135,8 +135,8 @@ public class ServiceConfigurationTest {
InputStream stream = new ByteArrayInputStream(confFile.getBytes());
final ServiceConfiguration conf = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
- assertEquals(conf.getMetadataStoreUrl(), "zk1:2181");
- assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk1:2181");
+ assertEquals(conf.getMetadataStoreUrl(), "zk:zk1:2181");
+ assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk:zk1:2181");
assertEquals(conf.getBookkeeperMetadataStoreUrl(), "metadata-store:zk:zk1:2181/ledgers");
assertFalse(conf.isConfigurationStoreSeparated());
assertFalse(conf.isBookkeeperMetadataStoreSeparated());
@@ -257,6 +257,15 @@ public class ServiceConfigurationTest {
}
}
+ @Test
+ public void testSubscriptionTypesEnableWins() throws Exception {
+ final Properties properties = new Properties();
+ properties.setProperty("subscriptionKeySharedEnable", "true");
+ properties.setProperty("subscriptionTypesEnabled", "Exclusive,Shared,Failover");
+ final ServiceConfiguration conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
+ assertFalse(conf.isSubscriptionKeySharedEnable());
+ }
+
/**
* Verify transaction batch log configuration load correct, cover these cases:
* 1. broker.conf. This is default value. If the property is not configured in the file, the default value
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 45502165180..6a4430e2669 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -128,7 +128,7 @@ public class PulsarFunctionE2ESecurityTest {
Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT);
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index e810a1d7d93..2296a6890bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -209,7 +209,7 @@ public class PulsarFunctionLocalRunTest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 5d830a67778..33366fa888e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -125,7 +125,7 @@ public class PulsarFunctionPublishTest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 227088019f0..4b7d5a6cb07 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -97,7 +97,7 @@ public class PulsarFunctionTlsTest {
config.setBrokerServicePortTls(Optional.of(brokerPort));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
Set<String> superUsers = Sets.newHashSet("superUser", "admin");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 9ba2ccde471..66a0b31ee98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -92,7 +92,7 @@ public class PulsarWorkerAssignmentTest {
final Set<String> superUsers = Sets.newHashSet("superUser", "admin");
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index da748bafea5..5009fca0195 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -117,7 +117,7 @@ public abstract class AbstractPulsarE2ETest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index afe16c63ec2..054bd4bce22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -99,7 +99,7 @@ public class PulsarFunctionAdminTest {
config.setSuperUserRoles(superUsers);
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2cb0e62f8e3..04467b5f204 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -113,7 +113,7 @@ public class PulsarFunctionTlsTest {
config.setClusterName("use");
Set<String> superUsers = Sets.newHashSet("superUser", "admin");
config.setSuperUserRoles(superUsers);
- config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+ config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);