You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/01 04:24:59 UTC

[pulsar] branch master updated: Add command to delete a cluster's metadata from ZK (#8169)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d41729f  Add command to delete a cluster's metadata from ZK (#8169)
d41729f is described below

commit d41729f422b215f53bd88c2eec6d1d112e804778
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 1 12:24:40 2020 +0800

    Add command to delete a cluster's metadata from ZK (#8169)
    
    Motivation
    When we share the same ZK and BK cluster with multiple broker clusters, if a cluster was removed, its metadata in ZK should also be removed.
    
    Modifications
    Add a PulsarClusterMetadataTeardown class to delete the relative nodes from ZK;
    Wrap the class to bin/pulsar script.
---
 bin/pulsar                                         |   3 +
 .../pulsar/PulsarClusterMetadataTeardown.java      | 111 +++++++++++++++++++++
 2 files changed, 114 insertions(+)

diff --git a/bin/pulsar b/bin/pulsar
index 1fe5332..0c20c8d 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -143,6 +143,7 @@ where command is one of:
     standalone          Run a broker server with local bookies and local zookeeper
 
     initialize-cluster-metadata     One-time metadata initialization
+    delete-cluster-metadata         Delete a cluster's metadata
     initialize-transaction-coordinator-metadata     One-time transaction coordinator metadata initialization
     initialize-namespace     namespace initialization
     compact-topic       Run compaction against a topic
@@ -339,6 +340,8 @@ elif [ $COMMAND == "standalone" ]; then
     exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
 elif [ $COMMAND == "initialize-cluster-metadata" ]; then
     exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
+elif [ $COMMAND == "delete-cluster-metadata" ]; then
+    exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataTeardown $@
 elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
     exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
 elif [ $COMMAND == "initialize-namespace" ]; then
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
new file mode 100644
index 0000000..a9a1c11
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Teardown the metadata for a existed Pulsar cluster
+ */
+public class PulsarClusterMetadataTeardown {
+
+    private static class Arguments {
+        @Parameter(names = { "-zk",
+                "--zookeeper"}, description = "Local ZooKeeper quorum connection string", required = true)
+        private String zookeeper;
+
+        @Parameter(names = {
+                "--zookeeper-session-timeout-ms"
+        }, description = "Local zookeeper session timeout ms")
+        private int zkSessionTimeoutMillis = 30000;
+
+        @Parameter(names = { "-c", "-cluster" }, description = "Cluster name")
+        private String cluster;
+
+        @Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string")
+        private String configurationStore;
+
+        @Parameter(names = { "-h", "--help" }, description = "Show this help message")
+        private boolean help = false;
+    }
+
+    public static void main(String[] args) throws InterruptedException {
+        Arguments arguments = new Arguments();
+        JCommander jcommander = new JCommander();
+        try {
+            jcommander.addObject(arguments);
+            jcommander.parse(args);
+            if (arguments.help) {
+                jcommander.usage();
+                return;
+            }
+        } catch (Exception e) {
+            jcommander.usage();
+            throw e;
+        }
+
+        ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
+
+        deleteZkNodeRecursively(localZk, "/bookies");
+        deleteZkNodeRecursively(localZk, "/counters");
+        deleteZkNodeRecursively(localZk, "/loadbalance");
+        deleteZkNodeRecursively(localZk, "/managed-ledgers");
+        deleteZkNodeRecursively(localZk, "/namespace");
+        deleteZkNodeRecursively(localZk, "/schemas");
+        deleteZkNodeRecursively(localZk, "/stream");
+
+        if (arguments.configurationStore != null && arguments.cluster != null) {
+            // Should it be done by REST API before broker is down?
+            ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
+            deleteZkNodeRecursively(configStoreZk, "/admin/clusters/" + arguments.cluster);
+        }
+
+        log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
+    }
+
+    public static ZooKeeper initZk(String connection, int sessionTimeout) throws InterruptedException {
+        ZooKeeperClientFactory zkFactory = new ZookeeperClientFactoryImpl();
+        try {
+            return zkFactory.create(connection, ZooKeeperClientFactory.SessionType.ReadWrite, sessionTimeout).get();
+        } catch (ExecutionException e) {
+            log.error("Failed to connect to '{}': {}", connection, e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) throws InterruptedException {
+        try {
+            ZKUtil.deleteRecursive(zooKeeper, path);
+        } catch (KeeperException e) {
+            log.warn("Failed to delete node {} from ZK [{}]: {}", path, zooKeeper, e);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataTeardown.class);
+}