You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/17 09:48:57 UTC

[pulsar] branch master updated: Setup pulsar cluster with MetadataStore (#10600)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ae7b1  Setup pulsar cluster with MetadataStore (#10600)
76ae7b1 is described below

commit 76ae7b1cb6ecb6d569adb6f256a100aa70e735b2
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Mon May 17 17:48:31 2021 +0800

    Setup pulsar cluster with MetadataStore (#10600)
    
    ### Motivation
    
    Refactor the pulsar cluster setup command to use the new `MetadataStore` API.
    
    ### Modifications
    
    - Refactored `PulsarClusterMetadataSetup` command to use the new `MetadataStore` API.
    - Added a new `MetadataStoreLifecycle` interface for the `MetadataStore` implementations to do initialization as needed.
      - For zookeeper, it creates the root node in chroot mode
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 153 +++++++++++++++++----
 .../broker/zookeeper/ClusterMetadataSetupTest.java |  65 +++++++++
 .../metadata/api/MetadataStoreLifecycle.java       |  38 +++++
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  41 +++++-
 4 files changed, 270 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index e090f3d..d5d3214 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -25,6 +25,8 @@ import com.beust.jcommander.Parameter;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.common.net.ServiceURI;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -41,6 +43,12 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -49,7 +57,6 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -143,6 +150,21 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
+    /**
+     * a wrapper for creating a persistent node with store.put but ignore exception of node exists.
+     */
+    private static void createMetadataNode(MetadataStore store, String path, byte[] data)
+            throws InterruptedException, ExecutionException {
+        try {
+            store.put(path, data, Optional.of(-1L)).get();
+        } catch (ExecutionException e) {
+            if (!(e.getCause() instanceof MetadataStoreException.BadVersionException)) {
+                throw e;
+            }
+            // Ignore
+        }
+    }
+
     private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
             throws IOException {
         InternalConfigurationData internalConf = new InternalConfigurationData(
@@ -195,15 +217,16 @@ public class PulsarClusterMetadataSetup {
         log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
                 arguments.configurationStore);
 
-        ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
-        ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
+        MetadataStoreExtended localStore = initMetadataStore(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+        MetadataStoreExtended configStore = initMetadataStore(arguments.configurationStore,
+                arguments.zkSessionTimeoutMillis);
 
         // Format BookKeeper ledger storage metadata
         ServerConfiguration bkConf = new ServerConfiguration();
         if (arguments.existingBkMetadataServiceUri == null && arguments.bookieMetadataServiceUri == null) {
             bkConf.setZkServers(arguments.zookeeper);
             bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
-            if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist
+            if (!localStore.exists("/ledgers").get() // only format if /ledgers doesn't exist
                 && !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
                 throw new IOException("Failed to initialize BookKeeper metadata");
             }
@@ -227,56 +250,49 @@ public class PulsarClusterMetadataSetup {
             initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
         }
 
-        if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
-            createZkNode(localZk, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
-                "{}".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        if (!localStore.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get()) {
+            createMetadataNode(localStore, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes());
         }
 
-        createZkNode(localZk, "/managed-ledgers", new byte[0],
-            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        createMetadataNode(localStore, "/managed-ledgers", new byte[0]);
 
-        createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        createMetadataNode(localStore, "/namespace", new byte[0]);
 
-        createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+        createMetadataNode(configStore, POLICIES_ROOT, new byte[0]);
 
-        createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+        createMetadataNode(configStore, "/admin/clusters", new byte[0]);
 
         ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
                 arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
         byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);
 
-        createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster, clusterDataJson,
-                ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+        createMetadataNode(configStore, "/admin/clusters/" + arguments.cluster, clusterDataJson);
 
         // Create marker for "global" cluster
         ClusterData globalClusterData = new ClusterData(null, null);
         byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);
 
-        createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
+        createMetadataNode(configStore, "/admin/clusters/global", globalClusterDataJson);
 
         // Create public tenant, whitelisted to use the this same cluster, along with other clusters
-        createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);
+        createTenantIfAbsent(configStore, TopicName.PUBLIC_TENANT, arguments.cluster);
 
         // Create system tenant
-        createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
+        createTenantIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
 
         // Create default namespace
-        createNamespaceIfAbsent(configStoreZk, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
+        createNamespaceIfAbsent(configStore, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
                 arguments.cluster);
 
         // Create system namespace
-        createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
+        createNamespaceIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
 
         // Create transaction coordinator assign partitioned topic
-        createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+        createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
                 arguments.numTransactionCoordinators);
 
-        localZk.close();
-        configStoreZk.close();
+        localStore.close();
+        configStore.close();
 
         log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
     }
@@ -308,6 +324,32 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
+    static void createTenantIfAbsent(MetadataStore configStore, String tenant, String cluster) throws IOException,
+            InterruptedException, ExecutionException {
+
+        String tenantPath = POLICIES_ROOT + "/" + tenant;
+
+        Optional<GetResult> getResult = configStore.get(tenantPath).get();
+        if (!getResult.isPresent()) {
+            TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));
+
+            createMetadataNode(configStore, tenantPath,
+                    ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant));
+        } else {
+            // Update existing public tenant with new cluster
+            byte[] content = getResult.get().getValue();
+            TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
+
+            // Only update z-node if the list of clusters should be modified
+            if (!publicTenant.getAllowedClusters().contains(cluster)) {
+                publicTenant.getAllowedClusters().add(cluster);
+
+                configStore.put(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
+                        Optional.of(getResult.get().getStat().getVersion()));
+            }
+        }
+    }
+
     static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
             throws KeeperException, InterruptedException, IOException {
         String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
@@ -338,6 +380,32 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
+    static void createNamespaceIfAbsent(MetadataStore configStore, NamespaceName namespaceName, String cluster)
+            throws InterruptedException, IOException, ExecutionException {
+        String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
+        Policies policies;
+        Optional<GetResult> getResult = configStore.get(namespacePath).get();
+        if (!getResult.isPresent()) {
+            policies = new Policies();
+            policies.bundles = getBundles(16);
+            policies.replication_clusters = Collections.singleton(cluster);
+
+            createMetadataNode(configStore, namespacePath,
+                    ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies));
+        } else {
+            byte[] content = getResult.get().getValue();
+            policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
+
+            // Only update z-node if the list of clusters should be modified
+            if (!policies.replication_clusters.contains(cluster)) {
+                policies.replication_clusters.add(cluster);
+
+                configStore.put(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                        Optional.of(getResult.get().getStat().getVersion()));
+            }
+        }
+    }
+
     static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions)
             throws KeeperException, InterruptedException, IOException {
         String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
@@ -367,6 +435,29 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
+    static void createPartitionedTopic(MetadataStore configStore, TopicName topicName, int numPartitions)
+            throws InterruptedException, IOException, ExecutionException {
+        String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
+        Optional<GetResult> getResult = configStore.get(partitionedTopicPath).get();
+        PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
+        if (!getResult.isPresent()) {
+            createMetadataNode(configStore, partitionedTopicPath,
+                    ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata));
+        } else {
+            byte[] content = getResult.get().getValue();
+            PartitionedTopicMetadata existsMeta =
+                    ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
+
+            // Only update z-node if the partitions should be modified
+            if (existsMeta.partitions < numPartitions) {
+                configStore.put(
+                        partitionedTopicPath,
+                        ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
+                        Optional.of(getResult.get().getStat().getVersion()));
+            }
+        }
+    }
+
     public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
         ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
         int chrootIndex = connection.indexOf("/");
@@ -386,5 +477,15 @@ public class PulsarClusterMetadataSetup {
         return zkConnect;
     }
 
+    public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
+        MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
+                .sessionTimeoutMillis(sessionTimeout)
+                .build());
+        if (store instanceof MetadataStoreLifecycle) {
+            ((MetadataStoreLifecycle) store).initializeCluster().get();
+        }
+        return store;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
index e135d71..2f022ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.broker.zookeeper;
 
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -32,6 +39,8 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 
@@ -53,8 +62,38 @@ public class ClusterMetadataSetupTest {
             "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
         };
         PulsarClusterMetadataSetup.main(args);
+        SortedMap<String, String> data1 = localZkS.dumpData();
+        PulsarClusterMetadataSetup.main(args);
+        SortedMap<String, String> data2 = localZkS.dumpData();
+        assertEquals(data1, data2);
         PulsarClusterMetadataSetup.main(args);
+        SortedMap<String, String> data3 = localZkS.dumpData();
+        assertEquals(data1, data3);
+    }
+
+    @Test
+    public void testSetupClusterInChrootMode() throws Exception {
+        HashSet<String> firstLevelNodes = new HashSet<>(Arrays.asList(
+                "admin", "bookies", "ledgers", "managed-ledgers", "namespace", "pulsar", "stream"
+        ));
+        String rootPath = "/test-prefix";
+        String[] args = {
+                "--cluster", "testReSetupClusterMetadata-cluster",
+                "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
+                "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
+                "--web-service-url", "http://127.0.0.1:8080",
+                "--web-service-url-tls", "https://127.0.0.1:8443",
+                "--broker-service-url", "pulsar://127.0.0.1:6650",
+                "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
+        };
         PulsarClusterMetadataSetup.main(args);
+
+        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+                .connectString("127.0.0.1:" + localZkS.getZookeeperPort())
+                .build()) {
+            assertNotNull(zk.exists(rootPath, false));
+            assertEquals(new HashSet<>(zk.getChildren(rootPath, false)), firstLevelNodes);
+        }
     }
 
     @Test
@@ -168,5 +207,31 @@ public class ClusterMetadataSetupTest {
         public int getZookeeperPort() {
             return serverFactory.getLocalPort();
         }
+
+        public SortedMap<String, String> dumpData() throws IOException, InterruptedException, KeeperException {
+            SortedMap<String, String> data = new TreeMap<>();
+            try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+                    .connectString("127.0.0.1:" + getZookeeperPort())
+                    .sessionTimeoutMs(20000)
+                    .build()) {
+                for (String child : zk.getChildren("/", false)) {
+                    if ("zookeeper".equals(child)) {
+                        continue;
+                    }
+                    dumpPath(zk, "/" + child, data);
+                }
+            }
+            return data;
+        }
+
+        private void dumpPath(ZooKeeper zk, String path, SortedMap<String, String> dataMap)
+                throws InterruptedException, KeeperException {
+            dataMap.put(path, new String(zk.getData(path, false, null), Charset.defaultCharset()));
+            for (String child : zk.getChildren(path, false)) {
+                dumpPath(zk, path + "/" + child, dataMap);
+            }
+        }
     }
+
+
 }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java
new file mode 100644
index 0000000..e994584
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Extension of the {@link MetadataStore} interface that supports lifecycle operation methods which might not
+ * be supported by all implementations.
+ */
+public interface MetadataStoreLifecycle {
+    /**
+     * Initialize the metadata store cluster if needed.
+     *
+     * For example, if the backend metadata store is a zookeeper cluster and the pulsar cluster is configured to
+     * access the zookeeper cluster in the chroot mode, then this method could be used to initialize the root node
+     * during pulsar cluster metadata setup.
+     *
+     * @return a future to track the async request.
+     */
+    CompletableFuture<Void> initializeCluster();
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4c85fe5..e1e44f8 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -31,12 +31,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.Stat;
@@ -51,14 +53,18 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 
 @Slf4j
-public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher {
+public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher, MetadataStoreLifecycle {
 
+    private final String metadataURL;
+    private final MetadataStoreConfig metadataStoreConfig;
     private final boolean isZkManaged;
     private final ZooKeeper zkc;
     private ZKSessionWatcher sessionWatcher;
 
     public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
         try {
+            this.metadataURL = metadataURL;
+            this.metadataStoreConfig = metadataStoreConfig;
             isZkManaged = true;
             zkc = ZooKeeperClient.newBuilder().connectString(metadataURL)
                     .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
@@ -78,6 +84,8 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
 
     @VisibleForTesting
     public ZKMetadataStore(ZooKeeper zkc) {
+        this.metadataURL = null;
+        this.metadataStoreConfig = null;
         this.isZkManaged = false;
         this.zkc = zkc;
         this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
@@ -360,4 +368,35 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
     public long getZkSessionId() {
         return zkc.getSessionId();
     }
+
+    @Override
+    public CompletableFuture<Void> initializeCluster() {
+        if (this.metadataURL == null) {
+            return FutureUtil.failedFuture(new MetadataStoreException("metadataURL is not set"));
+        }
+        if (this.metadataStoreConfig == null) {
+            return FutureUtil.failedFuture(new MetadataStoreException("metadataStoreConfig is not set"));
+        }
+        int chrootIndex = metadataURL.indexOf("/");
+        if (chrootIndex > 0) {
+            String chrootPath = metadataURL.substring(chrootIndex);
+            String zkConnectForChrootCreation = metadataURL.substring(0, chrootIndex);
+            try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+                    .connectString(zkConnectForChrootCreation)
+                    .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+                    .connectRetryPolicy(
+                            new BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+                                    metadataStoreConfig.getSessionTimeoutMillis(), 0))
+                    .build()) {
+                if (chrootZk.exists(chrootPath, false) == null) {
+                    ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    log.info("Created zookeeper chroot path {} successfully", chrootPath);
+                }
+            } catch (Exception e) {
+                return FutureUtil.failedFuture(e);
+            }
+        }
+        return CompletableFuture.completedFuture(null);
+    }
 }