You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/01 04:24:59 UTC
[pulsar] branch master updated: Add command to delete a cluster's
metadata from ZK (#8169)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d41729f Add command to delete a cluster's metadata from ZK (#8169)
d41729f is described below
commit d41729f422b215f53bd88c2eec6d1d112e804778
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 1 12:24:40 2020 +0800
Add command to delete a cluster's metadata from ZK (#8169)
Motivation
When we share the same ZK and BK cluster with multiple broker clusters, if a cluster was removed, its metadata in ZK should also be removed.
Modifications
Add a PulsarClusterMetadataTeardown class to delete the relative nodes from ZK;
Wrap the class to bin/pulsar script.
---
bin/pulsar | 3 +
.../pulsar/PulsarClusterMetadataTeardown.java | 111 +++++++++++++++++++++
2 files changed, 114 insertions(+)
diff --git a/bin/pulsar b/bin/pulsar
index 1fe5332..0c20c8d 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -143,6 +143,7 @@ where command is one of:
standalone Run a broker server with local bookies and local zookeeper
initialize-cluster-metadata One-time metadata initialization
+ delete-cluster-metadata Delete a cluster's metadata
initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization
initialize-namespace namespace initialization
compact-topic Run compaction against a topic
@@ -339,6 +340,8 @@ elif [ $COMMAND == "standalone" ]; then
exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
+elif [ $COMMAND == "delete-cluster-metadata" ]; then
+ exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "initialize-namespace" ]; then
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
new file mode 100644
index 0000000..a9a1c11
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Teardown the metadata for a existed Pulsar cluster
+ */
+public class PulsarClusterMetadataTeardown {
+
+ private static class Arguments {
+ @Parameter(names = { "-zk",
+ "--zookeeper"}, description = "Local ZooKeeper quorum connection string", required = true)
+ private String zookeeper;
+
+ @Parameter(names = {
+ "--zookeeper-session-timeout-ms"
+ }, description = "Local zookeeper session timeout ms")
+ private int zkSessionTimeoutMillis = 30000;
+
+ @Parameter(names = { "-c", "-cluster" }, description = "Cluster name")
+ private String cluster;
+
+ @Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string")
+ private String configurationStore;
+
+ @Parameter(names = { "-h", "--help" }, description = "Show this help message")
+ private boolean help = false;
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ Arguments arguments = new Arguments();
+ JCommander jcommander = new JCommander();
+ try {
+ jcommander.addObject(arguments);
+ jcommander.parse(args);
+ if (arguments.help) {
+ jcommander.usage();
+ return;
+ }
+ } catch (Exception e) {
+ jcommander.usage();
+ throw e;
+ }
+
+ ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+
+ deleteZkNodeRecursively(localZk, "/bookies");
+ deleteZkNodeRecursively(localZk, "/counters");
+ deleteZkNodeRecursively(localZk, "/loadbalance");
+ deleteZkNodeRecursively(localZk, "/managed-ledgers");
+ deleteZkNodeRecursively(localZk, "/namespace");
+ deleteZkNodeRecursively(localZk, "/schemas");
+ deleteZkNodeRecursively(localZk, "/stream");
+
+ if (arguments.configurationStore != null && arguments.cluster != null) {
+ // Should it be done by REST API before broker is down?
+ ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
+ deleteZkNodeRecursively(configStoreZk, "/admin/clusters/" + arguments.cluster);
+ }
+
+ log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
+ }
+
+ public static ZooKeeper initZk(String connection, int sessionTimeout) throws InterruptedException {
+ ZooKeeperClientFactory zkFactory = new ZookeeperClientFactoryImpl();
+ try {
+ return zkFactory.create(connection, ZooKeeperClientFactory.SessionType.ReadWrite, sessionTimeout).get();
+ } catch (ExecutionException e) {
+ log.error("Failed to connect to '{}': {}", connection, e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) throws InterruptedException {
+ try {
+ ZKUtil.deleteRecursive(zooKeeper, path);
+ } catch (KeeperException e) {
+ log.warn("Failed to delete node {} from ZK [{}]: {}", path, zooKeeper, e);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataTeardown.class);
+}