You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/07/19 06:36:44 UTC

[pulsar] branch master updated: [improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d4fa00b005 [improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)
3d4fa00b005 is described below

commit 3d4fa00b0059744c3d945c77672b9769c335014c
Author: tison <wa...@gmail.com>
AuthorDate: Tue Jul 19 14:36:36 2022 +0800

    [improve][cleanup] Keep usage of deprecated configs inside ServiceConfiguration (#16492)
    
    ### Motivation
    Cleanup code when reading it.
    
    ### Modifications
    Encapsulate usage of deprecated configs:
    - subscriptionKeySharedEnable
    - zookeeperServers
---
 .../apache/pulsar/broker/ServiceConfiguration.java | 36 +++++++++++---------
 .../configuration/PulsarConfigurationLoader.java   |  2 +-
 .../java/org/apache/pulsar/PulsarStandalone.java   |  3 +-
 .../apache/pulsar/broker/SLAMonitoringTest.java    |  2 +-
 .../AntiAffinityNamespaceGroupTest.java            |  4 +--
 .../loadbalance/LeaderElectionServiceTest.java     |  2 +-
 .../broker/loadbalance/LoadBalancerTest.java       |  2 +-
 .../loadbalance/ModularLoadManagerImplTest.java    |  6 ++--
 .../broker/loadbalance/SimpleBrokerStartTest.java  |  4 +--
 .../loadbalance/SimpleLoadManagerImplTest.java     |  4 +--
 .../loadbalance/impl/BundleSplitterTaskTest.java   |  2 +-
 .../broker/service/AdvertisedAddressTest.java      |  2 +-
 .../broker/service/BacklogQuotaManagerTest.java    |  2 +-
 .../pulsar/broker/service/BkEnsemblesTestBase.java |  2 +-
 .../broker/service/BrokerBookieIsolationTest.java  | 10 +++---
 .../pulsar/broker/service/TopicOwnerTest.java      |  2 +-
 .../coordinator/TransactionMetaStoreTestBase.java  |  2 +-
 .../client/api/ClientDeduplicationFailureTest.java |  2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 38 +++++++---------------
 .../common/naming/ServiceConfigurationTest.java    | 13 ++++++--
 .../worker/PulsarFunctionE2ESecurityTest.java      |  2 +-
 .../worker/PulsarFunctionLocalRunTest.java         |  2 +-
 .../worker/PulsarFunctionPublishTest.java          |  2 +-
 .../functions/worker/PulsarFunctionTlsTest.java    |  2 +-
 .../worker/PulsarWorkerAssignmentTest.java         |  2 +-
 .../apache/pulsar/io/AbstractPulsarE2ETest.java    |  2 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |  2 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  2 +-
 28 files changed, 78 insertions(+), 78 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c90c04e8eda..0e7eca59828 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -506,7 +506,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @Deprecated
     @FieldContext(
         category = CATEGORY_POLICIES,
-        doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
+        doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead."
     )
     private double backlogQuotaDefaultLimitGB = -1;
 
@@ -646,7 +646,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
     @FieldContext(
         category = CATEGORY_POLICIES,
         dynamic = true,
-        doc = "Enable Key_Shared subscription (default is enabled)"
+        doc = "Enable Key_Shared subscription (default is enabled).\n"
+            + "@deprecated - use subscriptionTypesEnabled instead."
     )
     private boolean subscriptionKeySharedEnable = true;
 
@@ -1903,7 +1904,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
         deprecated = true,
         doc = "Max number of `acknowledgment holes` that can be stored in Zookeeper.\n\n"
             + "If number of unack message range is higher than this limit then broker will persist"
-            + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.")
+            + " unacked ranges into bookkeeper to avoid additional data overhead into zookeeper.\n"
+            + "@deprecated - use managedLedgerMaxUnackedRangesToPersistInMetadataStore.")
     private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = -1;
     @FieldContext(
             category = CATEGORY_STORAGE_ML,
@@ -2312,7 +2314,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
         category = CATEGORY_POLICIES,
         deprecated = true,
         doc = "How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one "
-                + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`"
+                + "connected) Deprecated in favor of using `brokerDeleteInactiveTopicsFrequencySeconds`\n"
+                + "@deprecated - unused."
     )
     private int brokerServicePurgeInactiveFrequencyInSeconds = 60;
     @FieldContext(
@@ -2864,12 +2867,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private Set<String> additionalServlets = new TreeSet<>();
 
+    /**
+     * @deprecated Use {@link #getSubscriptionTypesEnabled()} instead
+     */
+    @Deprecated
+    public boolean isSubscriptionKeySharedEnable() {
+        return subscriptionKeySharedEnable && subscriptionTypesEnabled.contains("Key_Shared");
+    }
+
     public String getMetadataStoreUrl() {
         if (StringUtils.isNotBlank(metadataStoreUrl)) {
             return metadataStoreUrl;
-        } else {
+        } else if (StringUtils.isNotBlank(zookeeperServers)) {
             // Fallback to old setting
-            return zookeeperServers;
+            return ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers;
+        } else {
+            return "";
         }
     }
 
@@ -2907,15 +2920,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
         } else {
             // Fallback to same metadata service used by broker, adding the "metadata-store" to specify the BK
             // metadata adapter
-            String suffix;
-            if (StringUtils.isNotBlank(metadataStoreUrl)) {
-                suffix = metadataStoreUrl;
-            } else {
-                // Fallback to old setting
-                // Note: chroot is not settable by using 'zookeeperServers' config.
-                suffix = ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zookeeperServers;
-            }
-            return "metadata-store:" + suffix + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
+            // Note: chroot is not settable by using 'zookeeperServers' config.
+            return "metadata-store:" + getMetadataStoreUrl() + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
         }
     }
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 117681c87fe..12bba1a7fa2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -91,7 +91,7 @@ public class PulsarConfigurationLoader {
     public static <T extends PulsarConfiguration> T create(Properties properties,
             Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
         requireNonNull(properties);
-        T configuration = null;
+        T configuration;
         try {
             configuration = (T) clazz.getDeclaredConstructor().newInstance();
             configuration.setProperties(properties);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index 09acd1edc0e..b8c175c0a4d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -458,8 +458,7 @@ public class PulsarStandalone implements AutoCloseable {
                 this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
                 this.getBkDir(), this.isWipeData(), "127.0.0.1");
         bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
-
-        config.setZookeeperServers("127.0.0.1:" + zkPort);
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
     }
 
     private static void processTerminator(int exitCode) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index af9aeb7eabf..749eeb7da3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -84,7 +84,7 @@ public class SLAMonitoringTest {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 7595499e473..bafb523fb8f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -109,7 +109,7 @@ public class AntiAffinityNamespaceGroupTest {
         config1.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
@@ -131,7 +131,7 @@ public class AntiAffinityNamespaceGroupTest {
         config2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 925879d13b7..502cc56818a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -74,7 +74,7 @@ public class LeaderElectionServiceTest {
         config.setWebServicePort(Optional.of(0));
         config.setClusterName(clusterName);
         config.setAdvertisedAddress("localhost");
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         @Cleanup
         PulsarService pulsar = spyWithClassAndConstructorArgs(MockPulsarService.class, config);
         pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 2e97cc43f43..a241f754d74 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -123,7 +123,7 @@ public class LoadBalancerTest {
             config.setWebServicePort(Optional.of(0));
             config.setBrokerServicePortTls(Optional.of(0));
             config.setWebServicePortTls(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setBrokerShutdownTimeoutMs(0L);
             config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
             config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 3cf7db8e674..12f95a2b796 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -147,7 +147,7 @@ public class ModularLoadManagerImplTest {
         config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
 
         config1.setAdvertisedAddress("localhost");
         config1.setBrokerShutdownTimeoutMs(0L);
@@ -168,7 +168,7 @@ public class ModularLoadManagerImplTest {
         config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config2.setAdvertisedAddress("localhost");
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -591,7 +591,7 @@ public class ModularLoadManagerImplTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index a4e742b2bb6..c01aaec666b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -47,7 +47,7 @@ public class SimpleBrokerStartTest {
         ServiceConfiguration config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -75,7 +75,7 @@ public class SimpleBrokerStartTest {
         ServiceConfiguration config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 6b8711addd4..ba8035d9307 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -109,7 +109,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config1 = spy(ServiceConfiguration.class);
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
-        config1.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config1.setBrokerShutdownTimeoutMs(0L);
         config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config1.setBrokerServicePort(Optional.of(0));
@@ -129,7 +129,7 @@ public class SimpleLoadManagerImplTest {
         ServiceConfiguration config2 = new ServiceConfiguration();
         config2.setClusterName("use");
         config2.setWebServicePort(Optional.of(0));
-        config2.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config2.setBrokerShutdownTimeoutMs(0L);
         config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config2.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 21e4cfc666d..d4c3fced4ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -60,7 +60,7 @@ public class BundleSplitterTaskTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
 
         config.setAdvertisedAddress("localhost");
         config.setBrokerShutdownTimeoutMs(0L);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
index 21dd37097b4..0385574b109 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AdvertisedAddressTest.java
@@ -49,7 +49,7 @@ public class AdvertisedAddressTest {
         ServiceConfiguration config = new ServiceConfiguration();
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setWebServicePort(Optional.ofNullable(0));
         config.setClusterName("usc");
         config.setAdvertisedAddress("localhost");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 57d5f0dfd50..a71eeea087d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -94,7 +94,7 @@ public class BacklogQuotaManagerTest {
 
             // start pulsar service
             config = new ServiceConfiguration();
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index b1db3033a8d..66ea7d9522c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -76,7 +76,7 @@ public abstract class BkEnsemblesTestBase extends TestRetrySupport {
             if (config == null) {
                 config = new ServiceConfiguration();
             }
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
             config.setClusterName("usc");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a05248ac808..e575154a743 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -148,7 +148,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -301,7 +301,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -442,7 +442,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -602,7 +602,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
@@ -743,7 +743,7 @@ public class BrokerBookieIsolationTest {
         config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         config.setClusterName(cluster);
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index e3f106893b9..d8dadaf8b5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -97,7 +97,7 @@ public class TopicOwnerTest {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             configurations[i] = config;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 2f462d72e48..e69d6ba630b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -71,7 +71,7 @@ public abstract class TransactionMetaStoreTestBase extends TestRetrySupport {
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
-            config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             config.setAcknowledgmentAtBatchIndexLevelEnabled(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index 7d9c1c77b74..0d8e87d5151 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -84,7 +84,7 @@ public class ClientDeduplicationFailureTest {
         config = spy(ServiceConfiguration.class);
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
-        config.setMetadataStoreUrl("zk:127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 4b899f2ee0c..c5d716de011 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -112,6 +112,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        super.resetConfig();
         super.internalSetup();
         super.producerBaseSetup();
         this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
@@ -129,7 +130,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test(dataProvider = "data")
     public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType, boolean enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();
 
         @Cleanup
@@ -155,9 +155,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch)
-            throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testSendAndReceiveWithBatching(String topicType, boolean enableBatch) throws Exception {
         String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();
 
         @Cleanup
@@ -203,7 +201,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test(dataProvider = "batch")
     public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_exclusive-" + UUID.randomUUID();
 
         @Cleanup
@@ -253,10 +250,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType,
-            boolean enableBatch) throws PulsarClientException, InterruptedException {
-
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+        String topicType,
+        boolean enableBatch
+    ) throws PulsarClientException, InterruptedException {
         String topic = topicType + "://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
 
         @Cleanup
@@ -297,10 +294,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
     @Test(dataProvider = "data")
-    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType,
-                                                                                        boolean enableBatch)
-            throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(
+        String topicType,
+        boolean enableBatch
+    ) throws PulsarClientException {
         String topic = topicType + "://public/default/key_shared_none_key-" + UUID.randomUUID();
 
         @Cleanup
@@ -327,7 +324,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_none_key_exclusive-" + UUID.randomUUID();
 
         @Cleanup
@@ -366,7 +362,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();
 
         @Cleanup
@@ -395,7 +390,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     @Test(dataProvider = "batch")
     public void testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector(boolean enableBatch)
             throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_exclusive_ordering_key-" + UUID.randomUUID();
 
         @Cleanup
@@ -446,7 +440,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
     public void testDisableKeySharedSubscription() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(false);
+        this.conf.getSubscriptionTypesEnabled().remove("Key_Shared");
         String topic = "persistent://public/default/key_shared_disabled";
         pulsarClient.newConsumer()
             .topic(topic)
@@ -458,7 +452,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testCannotUseAcknowledgeCumulative() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_ack_cumulative-" + UUID.randomUUID();
 
         @Cleanup
@@ -484,7 +477,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test(dataProvider = "batch")
     public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testMakingProgressWithSlowerConsumer-" + UUID.randomUUID();
 
         String slowKey = "slowKey";
@@ -553,7 +545,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testOrderingWhenAddingConsumers() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testOrderingWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -595,7 +586,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testReadAheadWhenAddingConsumers() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -649,7 +639,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testRemoveFirstConsumer() throws Exception {
-        this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID();
 
         @Cleanup
@@ -707,7 +696,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     @Test
     public void testHashRangeConflict() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
         final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString();
         final String sub = "test";
 
@@ -796,9 +784,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
     @Test
-    public void testAttachKeyToMessageMetadata()
-            throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
+    public void testAttachKeyToMessageMetadata() throws PulsarClientException {
         String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
 
         @Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 95e4ecc577b..1c7ce107468 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -135,8 +135,8 @@ public class ServiceConfigurationTest {
         InputStream stream = new ByteArrayInputStream(confFile.getBytes());
         final ServiceConfiguration conf = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
 
-        assertEquals(conf.getMetadataStoreUrl(), "zk1:2181");
-        assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk1:2181");
+        assertEquals(conf.getMetadataStoreUrl(), "zk:zk1:2181");
+        assertEquals(conf.getConfigurationMetadataStoreUrl(), "zk:zk1:2181");
         assertEquals(conf.getBookkeeperMetadataStoreUrl(), "metadata-store:zk:zk1:2181/ledgers");
         assertFalse(conf.isConfigurationStoreSeparated());
         assertFalse(conf.isBookkeeperMetadataStoreSeparated());
@@ -257,6 +257,15 @@ public class ServiceConfigurationTest {
         }
     }
 
+    @Test
+    public void testSubscriptionTypesEnableWins() throws Exception {
+        final Properties properties = new Properties();
+        properties.setProperty("subscriptionKeySharedEnable", "true");
+        properties.setProperty("subscriptionTypesEnabled", "Exclusive,Shared,Failover");
+        final ServiceConfiguration conf = PulsarConfigurationLoader.create(properties, ServiceConfiguration.class);
+        assertFalse(conf.isSubscriptionKeySharedEnable());
+    }
+
     /**
      * Verify transaction batch log configuration load correct, cover these cases:
      *   1. broker.conf. This is default value. If the property is not configured in the file, the default value
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 45502165180..6a4430e2669 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -128,7 +128,7 @@ public class PulsarFunctionE2ESecurityTest {
         Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT);
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index e810a1d7d93..2296a6890bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -209,7 +209,7 @@ public class PulsarFunctionLocalRunTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 5d830a67778..33366fa888e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -125,7 +125,7 @@ public class PulsarFunctionPublishTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 227088019f0..4b7d5a6cb07 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -97,7 +97,7 @@ public class PulsarFunctionTlsTest {
             config.setBrokerServicePortTls(Optional.of(brokerPort));
             config.setClusterName("my-cluster");
             config.setAdvertisedAddress("localhost");
-            config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+            config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
             config.setDefaultNumberOfNamespaceBundles(1);
             config.setLoadBalancerEnabled(false);
             Set<String> superUsers = Sets.newHashSet("superUser", "admin");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 9ba2ccde471..66a0b31ee98 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -92,7 +92,7 @@ public class PulsarWorkerAssignmentTest {
         final Set<String> superUsers = Sets.newHashSet("superUser", "admin");
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index da748bafea5..5009fca0195 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -117,7 +117,7 @@ public abstract class AbstractPulsarE2ETest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index afe16c63ec2..054bd4bce22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -99,7 +99,7 @@ public class PulsarFunctionAdminTest {
         config.setSuperUserRoles(superUsers);
         config.setWebServicePort(Optional.of(0));
         config.setWebServicePortTls(Optional.of(0));
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         config.setBrokerShutdownTimeoutMs(0L);
         config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         config.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 2cb0e62f8e3..04467b5f204 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -113,7 +113,7 @@ public class PulsarFunctionTlsTest {
         config.setClusterName("use");
         Set<String> superUsers = Sets.newHashSet("superUser", "admin");
         config.setSuperUserRoles(superUsers);
-        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);