You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/17 09:48:57 UTC
[pulsar] branch master updated: Setup pulsar cluster with
MetadataStore (#10600)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 76ae7b1 Setup pulsar cluster with MetadataStore (#10600)
76ae7b1 is described below
commit 76ae7b1cb6ecb6d569adb6f256a100aa70e735b2
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Mon May 17 17:48:31 2021 +0800
Setup pulsar cluster with MetadataStore (#10600)
### Motivation
Refactor the pulsar cluster setup command to use the new `MetadataStore` API.
### Modifications
- Refactored `PulsarClusterMetadataSetup` command to use the new `MetadataStore` API.
- Added a new `MetadataStoreLifecycle` interface for the `MetadataStore` implementations to do initialization as needed.
- For zookeeper, it creates the root node in chroot mode
---
.../apache/pulsar/PulsarClusterMetadataSetup.java | 153 +++++++++++++++++----
.../broker/zookeeper/ClusterMetadataSetupTest.java | 65 +++++++++
.../metadata/api/MetadataStoreLifecycle.java | 38 +++++
.../pulsar/metadata/impl/ZKMetadataStore.java | 41 +++++-
4 files changed, 270 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index e090f3d..d5d3214 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -25,6 +25,8 @@ import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -41,6 +43,12 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -49,7 +57,6 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
@@ -143,6 +150,21 @@ public class PulsarClusterMetadataSetup {
}
}
+ /**
+ * a wrapper for creating a persistent node with store.put but ignore exception of node exists.
+ */
+ private static void createMetadataNode(MetadataStore store, String path, byte[] data)
+ throws InterruptedException, ExecutionException {
+ try {
+ store.put(path, data, Optional.of(-1L)).get();
+ } catch (ExecutionException e) {
+ if (!(e.getCause() instanceof MetadataStoreException.BadVersionException)) {
+ throw e;
+ }
+ // Ignore
+ }
+ }
+
private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
throws IOException {
InternalConfigurationData internalConf = new InternalConfigurationData(
@@ -195,15 +217,16 @@ public class PulsarClusterMetadataSetup {
log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
arguments.configurationStore);
- ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
- ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
+ MetadataStoreExtended localStore = initMetadataStore(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+ MetadataStoreExtended configStore = initMetadataStore(arguments.configurationStore,
+ arguments.zkSessionTimeoutMillis);
// Format BookKeeper ledger storage metadata
ServerConfiguration bkConf = new ServerConfiguration();
if (arguments.existingBkMetadataServiceUri == null && arguments.bookieMetadataServiceUri == null) {
bkConf.setZkServers(arguments.zookeeper);
bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
- if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist
+ if (!localStore.exists("/ledgers").get() // only format if /ledgers doesn't exist
&& !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
throw new IOException("Failed to initialize BookKeeper metadata");
}
@@ -227,56 +250,49 @@ public class PulsarClusterMetadataSetup {
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}
- if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
- createZkNode(localZk, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
- "{}".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ if (!localStore.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get()) {
+ createMetadataNode(localStore, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes());
}
- createZkNode(localZk, "/managed-ledgers", new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ createMetadataNode(localStore, "/managed-ledgers", new byte[0]);
- createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ createMetadataNode(localStore, "/namespace", new byte[0]);
- createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createMetadataNode(configStore, POLICIES_ROOT, new byte[0]);
- createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createMetadataNode(configStore, "/admin/clusters", new byte[0]);
ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);
- createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster, clusterDataJson,
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createMetadataNode(configStore, "/admin/clusters/" + arguments.cluster, clusterDataJson);
// Create marker for "global" cluster
ClusterData globalClusterData = new ClusterData(null, null);
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);
- createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ createMetadataNode(configStore, "/admin/clusters/global", globalClusterDataJson);
// Create public tenant, whitelisted to use the this same cluster, along with other clusters
- createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);
+ createTenantIfAbsent(configStore, TopicName.PUBLIC_TENANT, arguments.cluster);
// Create system tenant
- createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
+ createTenantIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
// Create default namespace
- createNamespaceIfAbsent(configStoreZk, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
+ createNamespaceIfAbsent(configStore, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
arguments.cluster);
// Create system namespace
- createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
+ createNamespaceIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
// Create transaction coordinator assign partitioned topic
- createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);
- localZk.close();
- configStoreZk.close();
+ localStore.close();
+ configStore.close();
log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}
@@ -308,6 +324,32 @@ public class PulsarClusterMetadataSetup {
}
}
+ static void createTenantIfAbsent(MetadataStore configStore, String tenant, String cluster) throws IOException,
+ InterruptedException, ExecutionException {
+
+ String tenantPath = POLICIES_ROOT + "/" + tenant;
+
+ Optional<GetResult> getResult = configStore.get(tenantPath).get();
+ if (!getResult.isPresent()) {
+ TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));
+
+ createMetadataNode(configStore, tenantPath,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant));
+ } else {
+ // Update existing public tenant with new cluster
+ byte[] content = getResult.get().getValue();
+ TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);
+
+ // Only update z-node if the list of clusters should be modified
+ if (!publicTenant.getAllowedClusters().contains(cluster)) {
+ publicTenant.getAllowedClusters().add(cluster);
+
+ configStore.put(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
+ Optional.of(getResult.get().getStat().getVersion()));
+ }
+ }
+ }
+
static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
@@ -338,6 +380,32 @@ public class PulsarClusterMetadataSetup {
}
}
+ static void createNamespaceIfAbsent(MetadataStore configStore, NamespaceName namespaceName, String cluster)
+ throws InterruptedException, IOException, ExecutionException {
+ String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
+ Policies policies;
+ Optional<GetResult> getResult = configStore.get(namespacePath).get();
+ if (!getResult.isPresent()) {
+ policies = new Policies();
+ policies.bundles = getBundles(16);
+ policies.replication_clusters = Collections.singleton(cluster);
+
+ createMetadataNode(configStore, namespacePath,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies));
+ } else {
+ byte[] content = getResult.get().getValue();
+ policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
+
+ // Only update z-node if the list of clusters should be modified
+ if (!policies.replication_clusters.contains(cluster)) {
+ policies.replication_clusters.add(cluster);
+
+ configStore.put(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+ Optional.of(getResult.get().getStat().getVersion()));
+ }
+ }
+ }
+
static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions)
throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
@@ -367,6 +435,29 @@ public class PulsarClusterMetadataSetup {
}
}
+ static void createPartitionedTopic(MetadataStore configStore, TopicName topicName, int numPartitions)
+ throws InterruptedException, IOException, ExecutionException {
+ String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
+ Optional<GetResult> getResult = configStore.get(partitionedTopicPath).get();
+ PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
+ if (!getResult.isPresent()) {
+ createMetadataNode(configStore, partitionedTopicPath,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata));
+ } else {
+ byte[] content = getResult.get().getValue();
+ PartitionedTopicMetadata existsMeta =
+ ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
+
+ // Only update z-node if the partitions should be modified
+ if (existsMeta.partitions < numPartitions) {
+ configStore.put(
+ partitionedTopicPath,
+ ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
+ Optional.of(getResult.get().getStat().getVersion()));
+ }
+ }
+ }
+
public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
int chrootIndex = connection.indexOf("/");
@@ -386,5 +477,15 @@ public class PulsarClusterMetadataSetup {
return zkConnect;
}
+ public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
+ MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
+ .sessionTimeoutMillis(sessionTimeout)
+ .build());
+ if (store instanceof MetadataStoreLifecycle) {
+ ((MetadataStoreLifecycle) store).initializeCluster().get();
+ }
+ return store;
+ }
+
private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
index e135d71..2f022ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
@@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.zookeeper;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -32,6 +39,8 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -53,8 +62,38 @@ public class ClusterMetadataSetupTest {
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);
+ SortedMap<String, String> data1 = localZkS.dumpData();
+ PulsarClusterMetadataSetup.main(args);
+ SortedMap<String, String> data2 = localZkS.dumpData();
+ assertEquals(data1, data2);
PulsarClusterMetadataSetup.main(args);
+ SortedMap<String, String> data3 = localZkS.dumpData();
+ assertEquals(data1, data3);
+ }
+
+ @Test
+ public void testSetupClusterInChrootMode() throws Exception {
+ HashSet<String> firstLevelNodes = new HashSet<>(Arrays.asList(
+ "admin", "bookies", "ledgers", "managed-ledgers", "namespace", "pulsar", "stream"
+ ));
+ String rootPath = "/test-prefix";
+ String[] args = {
+ "--cluster", "testReSetupClusterMetadata-cluster",
+ "--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
+ "--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
+ "--web-service-url", "http://127.0.0.1:8080",
+ "--web-service-url-tls", "https://127.0.0.1:8443",
+ "--broker-service-url", "pulsar://127.0.0.1:6650",
+ "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
+ };
PulsarClusterMetadataSetup.main(args);
+
+ try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+ .connectString("127.0.0.1:" + localZkS.getZookeeperPort())
+ .build()) {
+ assertNotNull(zk.exists(rootPath, false));
+ assertEquals(new HashSet<>(zk.getChildren(rootPath, false)), firstLevelNodes);
+ }
}
@Test
@@ -168,5 +207,31 @@ public class ClusterMetadataSetupTest {
public int getZookeeperPort() {
return serverFactory.getLocalPort();
}
+
+ public SortedMap<String, String> dumpData() throws IOException, InterruptedException, KeeperException {
+ SortedMap<String, String> data = new TreeMap<>();
+ try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+ .connectString("127.0.0.1:" + getZookeeperPort())
+ .sessionTimeoutMs(20000)
+ .build()) {
+ for (String child : zk.getChildren("/", false)) {
+ if ("zookeeper".equals(child)) {
+ continue;
+ }
+ dumpPath(zk, "/" + child, data);
+ }
+ }
+ return data;
+ }
+
+ private void dumpPath(ZooKeeper zk, String path, SortedMap<String, String> dataMap)
+ throws InterruptedException, KeeperException {
+ dataMap.put(path, new String(zk.getData(path, false, null), Charset.defaultCharset()));
+ for (String child : zk.getChildren(path, false)) {
+ dumpPath(zk, path + "/" + child, dataMap);
+ }
+ }
}
+
+
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java
new file mode 100644
index 0000000..e994584
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreLifecycle.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Extension of the {@link MetadataStore} interface that supports lifecycle operation methods which might not
+ * be supported by all implementations.
+ */
+public interface MetadataStoreLifecycle {
+ /**
+ * Initialize the metadata store cluster if needed.
+ *
+ * For example, if the backend metadata store is a zookeeper cluster and the pulsar cluster is configured to
+ * access the zookeeper cluster in the chroot mode, then this method could be used to initialize the root node
+ * during pulsar cluster metadata setup.
+ *
+ * @return a future to track the async request.
+ */
+ CompletableFuture<Void> initializeCluster();
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 4c85fe5..e1e44f8 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -31,12 +31,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
@@ -51,14 +53,18 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
@Slf4j
-public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher {
+public class ZKMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended, Watcher, MetadataStoreLifecycle {
+ private final String metadataURL;
+ private final MetadataStoreConfig metadataStoreConfig;
private final boolean isZkManaged;
private final ZooKeeper zkc;
private ZKSessionWatcher sessionWatcher;
public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
try {
+ this.metadataURL = metadataURL;
+ this.metadataStoreConfig = metadataStoreConfig;
isZkManaged = true;
zkc = ZooKeeperClient.newBuilder().connectString(metadataURL)
.connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
@@ -78,6 +84,8 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
@VisibleForTesting
public ZKMetadataStore(ZooKeeper zkc) {
+ this.metadataURL = null;
+ this.metadataStoreConfig = null;
this.isZkManaged = false;
this.zkc = zkc;
this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
@@ -360,4 +368,35 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt
public long getZkSessionId() {
return zkc.getSessionId();
}
+
+ @Override
+ public CompletableFuture<Void> initializeCluster() {
+ if (this.metadataURL == null) {
+ return FutureUtil.failedFuture(new MetadataStoreException("metadataURL is not set"));
+ }
+ if (this.metadataStoreConfig == null) {
+ return FutureUtil.failedFuture(new MetadataStoreException("metadataStoreConfig is not set"));
+ }
+ int chrootIndex = metadataURL.indexOf("/");
+ if (chrootIndex > 0) {
+ String chrootPath = metadataURL.substring(chrootIndex);
+ String zkConnectForChrootCreation = metadataURL.substring(0, chrootIndex);
+ try (ZooKeeper chrootZk = ZooKeeperClient.newBuilder()
+ .connectString(zkConnectForChrootCreation)
+ .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
+ .connectRetryPolicy(
+ new BoundExponentialBackoffRetryPolicy(metadataStoreConfig.getSessionTimeoutMillis(),
+ metadataStoreConfig.getSessionTimeoutMillis(), 0))
+ .build()) {
+ if (chrootZk.exists(chrootPath, false) == null) {
+ ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ log.info("Created zookeeper chroot path {} successfully", chrootPath);
+ }
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ }
}