You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/24 18:41:03 UTC

[GitHub] sijie closed pull request #979: ISSUE #978: decouple metaformat cmd

sijie closed pull request #979: ISSUE #978: decouple metaformat cmd
URL: https://github.com/apache/bookkeeper/pull/979
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 82ad868c5..62b4992b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -140,6 +140,8 @@
 
     static final String CMD_METAFORMAT = "metaformat";
     static final String CMD_INITBOOKIE = "initbookie";
+    static final String CMD_INITNEWCLUSTER = "initnewcluster";
+    static final String CMD_NUKEEXISTINGCLUSTER = "nukeexistingcluster";
     static final String CMD_BOOKIEFORMAT = "bookieformat";
     static final String CMD_RECOVER = "recover";
     static final String CMD_LEDGER = "ledger";
@@ -148,6 +150,7 @@
     static final String CMD_LEDGERMETADATA = "ledgermetadata";
     static final String CMD_LISTUNDERREPLICATED = "listunderreplicated";
     static final String CMD_WHOISAUDITOR = "whoisauditor";
+    static final String CMD_WHATISINSTANCEID = "whatisinstanceid";
     static final String CMD_SIMPLETEST = "simpletest";
     static final String CMD_BOOKIESANITYTEST = "bookiesanity";
     static final String CMD_READLOG = "readlog";
@@ -272,6 +275,99 @@ int runCmd(CommandLine cmdLine) throws Exception {
         }
     }
 
+    /**
+     * Intializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out. If for any
+     * reason it errors out while creating znodes for the cluster, then before
+     * running initnewcluster again, try nuking existing cluster by running
+     * nukeexistingcluster. This is required because ledgersrootpath znode would
+     * be created after verifying that it doesn't exist, hence during next retry
+     * of initnewcluster it would complain saying that ledgersrootpath is
+     * already existing.
+     */
+    class InitNewCluster extends MyCommand {
+        Options opts = new Options();
+
+        InitNewCluster() {
+            super(CMD_INITNEWCLUSTER);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Initializes a new bookkeeper cluster. If initnewcluster fails then try nuking "
+                    + "existing cluster by running nukeexistingcluster before running initnewcluster again";
+        }
+
+        @Override
+        String getUsage() {
+            return "initnewcluster";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean result = BookKeeperAdmin.initNewCluster(bkConf);
+            return (result) ? 0 : 1;
+        }
+    }
+
+    /**
+     * Nuke bookkeeper metadata of existing cluster in zookeeper.
+     */
+    class NukeExistingCluster extends MyCommand {
+        Options opts = new Options();
+
+        NukeExistingCluster() {
+            super(CMD_NUKEEXISTINGCLUSTER);
+            opts.addOption("p", "zkledgersrootpath", true, "zookeeper ledgers rootpath");
+            opts.addOption("i", "instanceid", true, "instanceid");
+            opts.addOption("f", "force", false,
+                    "If instanceid is not specified, "
+                    + "then whether to force nuke the metadata without validating instanceid");
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Nuke bookkeeper cluster by deleting metadata";
+        }
+
+        @Override
+        String getUsage() {
+            return "nukeexistingcluster -zkledgersrootpath <zkledgersrootpath> [-instanceid <instanceid> | -force]";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            boolean force = cmdLine.hasOption("f");
+            String zkledgersrootpath = cmdLine.getOptionValue("zkledgersrootpath");
+            String instanceid = cmdLine.getOptionValue("instanceid");
+
+            /*
+             * for NukeExistingCluster command 'zkledgersrootpath' should be provided and either force option or
+             * instanceid should be provided.
+             */
+            if ((zkledgersrootpath == null) || (force == (instanceid != null))) {
+                LOG.error(
+                        "zkledgersrootpath should be specified and either force option "
+                        + "or instanceid should be specified (but not both)");
+                printUsage();
+                return -1;
+            }
+
+            boolean result = BookKeeperAdmin.nukeExistingCluster(bkConf, zkledgersrootpath, instanceid, force);
+            return (result) ? 0 : 1;
+        }
+    }
+
     /**
      * Formats the local data present in current bookie server.
      */
@@ -1703,6 +1799,42 @@ int runCmd(CommandLine cmdLine) throws Exception {
         }
     }
 
+    /**
+     * Prints the instanceid of the cluster.
+     */
+    class WhatIsInstanceId extends MyCommand {
+        Options opts = new Options();
+
+        public WhatIsInstanceId() {
+            super(CMD_WHATISINSTANCEID);
+        }
+
+        @Override
+        Options getOptions() {
+            return opts;
+        }
+
+        @Override
+        String getDescription() {
+            return "Print the instanceid of the cluster";
+        }
+
+        @Override
+        String getUsage() {
+            return "whatisinstanceid";
+        }
+
+        @Override
+        int runCmd(CommandLine cmdLine) throws Exception {
+            try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(bkConf)) {
+                String readInstanceId = rm.getClusterInstanceId();
+                LOG.info("ZKServers: {} ZkLedgersRootPath: {} InstanceId: {}", bkConf.getZkServers(),
+                        bkConf.getZkLedgersRootPath(), readInstanceId);
+            }
+            return 0;
+        }
+    }
+
     /**
      * Update cookie command.
      */
@@ -2019,7 +2151,7 @@ String getDescription() {
 
         @Override
         String getUsage() {
-            return "updateledger -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] "
+            return "updateledgers -bookieId <hostname|ip> [-updatespersec N] [-limit N] [-verbose true/false] "
                     + "[-printprogress N]";
         }
 
@@ -2562,6 +2694,8 @@ int runCmd(CommandLine cmdLine) throws Exception {
     {
         commands.put(CMD_METAFORMAT, new MetaFormatCmd());
         commands.put(CMD_INITBOOKIE, new InitBookieCmd());
+        commands.put(CMD_INITNEWCLUSTER, new InitNewCluster());
+        commands.put(CMD_NUKEEXISTINGCLUSTER, new NukeExistingCluster());
         commands.put(CMD_BOOKIEFORMAT, new BookieFormatCmd());
         commands.put(CMD_RECOVER, new RecoverCmd());
         commands.put(CMD_LEDGER, new LedgerCmd());
@@ -2569,6 +2703,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
         commands.put(CMD_LISTLEDGERS, new ListLedgersCmd());
         commands.put(CMD_LISTUNDERREPLICATED, new ListUnderreplicatedCmd());
         commands.put(CMD_WHOISAUDITOR, new WhoIsAuditorCmd());
+        commands.put(CMD_WHATISINSTANCEID, new WhatIsInstanceId());
         commands.put(CMD_LEDGERMETADATA, new LedgerMetadataCmd());
         commands.put(CMD_SIMPLETEST, new SimpleTestCmd());
         commands.put(CMD_BOOKIESANITYTEST, new BookieSanityTestCmd());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 85527d674..a723884f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1127,7 +1127,7 @@ public static boolean format(ServerConfiguration conf,
             boolean isInteractive, boolean force) throws Exception {
 
         try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
-            boolean ledgerRootExists = rm.prepareFormat(conf);
+            boolean ledgerRootExists = rm.prepareFormat();
 
             // If old data was there then confirm with admin.
             boolean doFormat = true;
@@ -1155,7 +1155,56 @@ public static boolean format(ServerConfiguration conf,
             BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf));
             bkc.ledgerManagerFactory.format(conf, bkc.regClient.getLayoutManager());
 
-            return rm.format(conf);
+            return rm.format();
+        }
+    }
+
+    /**
+     * Intializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out.
+     *
+     * @param conf
+     * @return
+     * @throws Exception
+     */
+    public static boolean initNewCluster(ServerConfiguration conf) throws Exception {
+        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
+            return rm.initNewCluster();
+        }
+    }
+
+    /**
+     * Nukes existing cluster metadata. But it does only if the provided
+     * ledgersRootPath matches with configuration's zkLedgersRootPath and
+     * provided instanceid matches with the cluster metadata. If force is
+     * mentioned then instanceid will not be validated.
+     *
+     * @param conf
+     * @param ledgersRootPath
+     * @param instanceId
+     * @param force
+     * @return
+     * @throws Exception
+     */
+    public static boolean nukeExistingCluster(ServerConfiguration conf, String ledgersRootPath, String instanceId,
+            boolean force) throws Exception {
+        String confLedgersRootPath = conf.getZkLedgersRootPath();
+        if (!confLedgersRootPath.equals(ledgersRootPath)) {
+            LOG.error("Provided ledgerRootPath : {} is not matching with config's ledgerRootPath: {}, "
+                    + "so exiting nuke operation", ledgersRootPath, confLedgersRootPath);
+            return false;
+        }
+
+        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
+            if (!force) {
+                String readInstanceId = rm.getClusterInstanceId();
+                if ((instanceId == null) || !instanceId.equals(readInstanceId)) {
+                    LOG.error("Provided InstanceId : {} is not matching with cluster InstanceId in ZK: {}", instanceId,
+                            readInstanceId);
+                    return false;
+                }
+            }
+            return rm.nukeExistingCluster();
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 498a18c51..5dea77424 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -112,7 +112,8 @@ public void onRemoval(RemovalNotification<BookieSocketAddress, Boolean> bookie)
         }
     }
 
-    public Set<BookieSocketAddress> getReadOnlyBookies() throws BKException {
+    public Set<BookieSocketAddress> getReadOnlyBookies()
+            throws BKException {
         try {
             return FutureUtils.result(registrationClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
         } catch (BKInterruptedException ie) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
index d0c304bc3..0c3922f50 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -146,16 +146,32 @@ RegistrationManager initialize(ServerConfiguration conf,
     /**
      * Prepare ledgers root node, availableNode, readonly node..
      *
-     * @param conf the conf
      * @return Returns true if old data exists, false if not.
      */
-    boolean prepareFormat(ServerConfiguration conf) throws Exception;
+    boolean prepareFormat() throws Exception;
+
+    /**
+     * Initializes new cluster by creating required znodes for the cluster. If
+     * ledgersrootpath is already existing then it will error out.
+     *
+     * @return returns true if new cluster is successfully created or false if it failed to initialize.
+     * @throws Exception
+     */
+    boolean initNewCluster() throws Exception;
 
     /**
      * Do format boolean.
      *
-     * @param conf the conf
      * @return Returns true if success do format, false if not.
      */
-    boolean format(ServerConfiguration conf) throws Exception;
+    boolean format() throws Exception;
+
+    /**
+     * Nukes existing cluster metadata.
+     *
+     * @return returns true if cluster metadata is successfully nuked
+     *          or false if it failed to nuke the cluster metadata.
+     * @throws Exception
+     */
+    boolean nukeExistingCluster() throws Exception;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index 527af780d..b82f406d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -26,22 +26,36 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.BKInterruptedException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LayoutManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -59,6 +73,7 @@
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
@@ -69,6 +84,18 @@
 @Slf4j
 public class ZKRegistrationManager implements RegistrationManager {
 
+    private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
+        if (cause instanceof BKException) {
+            log.error("Failed to get bookie list : ", cause);
+            return (BKException) cause;
+        } else if (cause instanceof InterruptedException) {
+            log.error("Interrupted reading bookie list : ", cause);
+            return new BKInterruptedException();
+        } else {
+            return new MetaStoreException();
+        }
+    };
+
     private ServerConfiguration conf;
     private ZooKeeper zk;
     private List<ACL> zkAcls;
@@ -421,97 +448,153 @@ public LayoutManager getLayoutManager(){
     }
 
     @Override
-    public boolean prepareFormat(ServerConfiguration conf) throws Exception {
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
-            .connectString(conf.getZkServers())
-            .sessionTimeoutMs(conf.getZkTimeout())
-            .build()) {
-
-            log.info("Formatting ZooKeeper metadata, ledger root path: {}", conf.getZkLedgersRootPath());
-            boolean ledgerRootExists = null != zk.exists(
-                conf.getZkLedgersRootPath(), false);
-            boolean availableNodeExists = null != zk.exists(
-                conf.getZkAvailableBookiesPath(), false);
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            // Create ledgers root node if not exists
-            if (!ledgerRootExists) {
-                zk.create(conf.getZkLedgersRootPath(), "".getBytes(Charsets.UTF_8),
-                    zkAcls, CreateMode.PERSISTENT);
-            }
-            // create available bookies node if not exists
-            if (!availableNodeExists) {
-                zk.create(conf.getZkAvailableBookiesPath(), "".getBytes(Charsets.UTF_8),
-                    zkAcls, CreateMode.PERSISTENT);
-            }
+    public boolean prepareFormat() throws Exception {
+        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
+        boolean availableNodeExists = null != zk.exists(conf.getZkAvailableBookiesPath(), false);
+        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        // Create ledgers root node if not exists
+        if (!ledgerRootExists) {
+            zk.create(conf.getZkLedgersRootPath(), "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+        }
+        // create available bookies node if not exists
+        if (!availableNodeExists) {
+            zk.create(conf.getZkAvailableBookiesPath(), "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+        }
 
-            // create readonly bookies node if not exists
-            if (null == zk.exists(conf.getZkAvailableBookiesPath() + "/" + READONLY, false)) {
-                zk.create(
-                    conf.getZkAvailableBookiesPath() + "/" + READONLY,
-                    new byte[0],
-                    zkAcls,
-                    CreateMode.PERSISTENT);
-            }
+        // create readonly bookies node if not exists
+        if (null == zk.exists(conf.getZkAvailableBookiesPath() + "/" + READONLY, false)) {
+            zk.create(conf.getZkAvailableBookiesPath() + "/" + READONLY, new byte[0], zkAcls, CreateMode.PERSISTENT);
+        }
+
+        return ledgerRootExists;
+    }
 
-            return ledgerRootExists;
+    @Override
+    public boolean initNewCluster() throws Exception {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        String zkAvailableBookiesPath = conf.getZkAvailableBookiesPath();
+        log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers,
+                zkLedgersRootPath);
+
+        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
+
+        if (ledgerRootExists) {
+            log.error("Ledger root path: {} already exists", conf.getZkLedgersRootPath());
+            return false;
         }
+
+        // Create ledgers root node
+        zk.create(zkLedgersRootPath, "".getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // create available bookies node
+        zk.create(zkAvailableBookiesPath, "".getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // creates the new layout and stores in zookeeper
+        LedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        zk.create(conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID, instanceId.getBytes(UTF_8),
+                Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers,
+                zkLedgersRootPath, instanceId);
+        return true;
     }
 
     @Override
-    public boolean format(ServerConfiguration conf) throws Exception {
-        // Clear underreplicated ledgers
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
-            .connectString(conf.getZkServers())
-            .sessionTimeoutMs(conf.getZkTimeout())
-            .build()) {
-            try {
-                ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
-                    + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("underreplicated ledgers root path node not exists in zookeeper to delete");
+    public boolean nukeExistingCluster() throws Exception {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}",
+                zkServers, zkLedgersRootPath);
+
+        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
+        if (!ledgerRootExists) {
+            log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, "
+                    + "so exiting nuke operation", zkLedgersRootPath, conf.getZkServers());
+            return true;
+        }
+
+        String availableBookiesPath = conf.getZkAvailableBookiesPath();
+        boolean availableNodeExists = null != zk.exists(availableBookiesPath, false);
+        try (RegistrationClient regClient = new ZKRegistrationClient()) {
+            regClient.initialize(new ClientConfiguration(conf), null, NullStatsLogger.INSTANCE, Optional.empty());
+            if (availableNodeExists) {
+                Collection<BookieSocketAddress> rwBookies = FutureUtils
+                        .result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue();
+                if (rwBookies != null && !rwBookies.isEmpty()) {
+                    log.error("Bookies are still up and connected to this cluster, "
+                            + "stop all bookies before nuking the cluster");
+                    return false;
                 }
-            }
 
-            // Clear underreplicatedledger locks
-            try {
-                ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
-                    + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("underreplicatedledger locks node not exists in zookeeper to delete");
+                String readOnlyBookieRegPath = availableBookiesPath + "/" + BookKeeperConstants.READONLY;
+                boolean readonlyNodeExists = null != zk.exists(readOnlyBookieRegPath, false);
+                if (readonlyNodeExists) {
+                    Collection<BookieSocketAddress> roBookies = FutureUtils
+                            .result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
+                    if (roBookies != null && !roBookies.isEmpty()) {
+                        log.error("Readonly Bookies are still up and connected to this cluster, "
+                                + "stop all bookies before nuking the cluster");
+                        return false;
+                    }
                 }
             }
+        }
 
-            // Clear the cookies
-            try {
-                ZKUtil.deleteRecursive(zk, conf.getZkLedgersRootPath()
-                    + "/cookies");
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("cookies node not exists in zookeeper to delete");
-                }
+        LedgerManagerFactory ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
+        return ledgerManagerFactory.validateAndNukeExistingCluster(conf, layoutManager);
+    }
+
+    @Override
+    public boolean format() throws Exception {
+        // Clear underreplicated ledgers
+        try {
+            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
+                    + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicated ledgers root path node not exists in zookeeper to delete");
             }
+        }
 
-            // Clear the INSTANCEID
-            try {
-                zk.delete(conf.getZkLedgersRootPath() + "/"
-                    + BookKeeperConstants.INSTANCEID, -1);
-            } catch (KeeperException.NoNodeException e) {
-                if (log.isDebugEnabled()) {
-                    log.debug("INSTANCEID not exists in zookeeper to delete");
-                }
+        // Clear underreplicatedledger locks
+        try {
+            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath()) + '/'
+                    + BookKeeperConstants.UNDER_REPLICATION_LOCK);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("underreplicatedledger locks node not exists in zookeeper to delete");
             }
+        }
 
-            // create INSTANCEID
-            String instanceId = UUID.randomUUID().toString();
-            zk.create(conf.getZkLedgersRootPath() + "/"
-                    + BookKeeperConstants.INSTANCEID, instanceId.getBytes(Charsets.UTF_8),
-                ZkUtils.getACLs(conf), CreateMode.PERSISTENT);
+        // Clear the cookies
+        try {
+            ZKUtil.deleteRecursive(zk, conf.getZkLedgersRootPath() + "/cookies");
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("cookies node not exists in zookeeper to delete");
+            }
+        }
 
-            log.info("Successfully formatted BookKeeper metadata");
-            return true;
+        // Clear the INSTANCEID
+        try {
+            zk.delete(conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID, -1);
+        } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("INSTANCEID not exists in zookeeper to delete");
+            }
         }
+
+        // create INSTANCEID
+        String instanceId = UUID.randomUUID().toString();
+        zk.create(conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID,
+                instanceId.getBytes(Charsets.UTF_8), ZkUtils.getACLs(conf), CreateMode.PERSISTENT);
+
+        log.info("Successfully formatted BookKeeper metadata");
+        return true;
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
index 9d25ac345..510f04c31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -213,5 +213,4 @@ long getLedgerId(String...levelNodes) throws IOException {
         }
         return zkActiveLedgers;
     }
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 09c7a39a9..56fd03578 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -63,7 +63,7 @@
 /**
  * Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
  */
-abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
+public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
@@ -79,6 +79,9 @@
     // we use this to prevent long stack chains from building up in callbacks
     protected ScheduledExecutorService scheduler;
 
+    /**
+     * ReadLedgerMetadataTask class.
+     */
     protected class ReadLedgerMetadataTask implements Runnable, GenericCallback<LedgerMetadata> {
 
         final long ledgerId;
@@ -496,17 +499,41 @@ public void operationComplete(int rc, List<String> ledgerNodes) {
      *          Znode Name
      * @return true  if the znode is a special znode otherwise false
      */
-     protected static boolean isSpecialZnode(String znode) {
+     public static boolean isSpecialZnode(String znode) {
         if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
                 || BookKeeperConstants.COOKIE_NODE.equals(znode)
                 || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
                 || BookKeeperConstants.INSTANCEID.equals(znode)
-                || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)) {
+                || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)
+                || LegacyHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
+                || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
+                || znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX)) {
             return true;
         }
         return false;
     }
 
+    /**
+     * regex expression for name of top level parent znode for ledgers (in
+     * HierarchicalLedgerManager) or znode of a ledger (in FlatLedgerManager).
+     *
+     * @return
+     */
+    protected abstract String getLedgerParentNodeRegex();
+
+    /**
+     * whether the child of ledgersRootPath is a top level parent znode for
+     * ledgers (in HierarchicalLedgerManager) or znode of a ledger (in
+     * FlatLedgerManager).
+     *
+     * @param znode
+     *            Znode Name
+     * @return
+     */
+    public boolean isLedgerParentNode(String znode) {
+        return znode.matches(getLedgerParentNodeRegex());
+    }
+
     /**
      * Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
new file mode 100644
index 000000000..b8dcc0314
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Abstract ledger manager factory based on zookeeper, which provides common
+ * methods such as format and validateAndNukeExistingCluster.
+ */
+@Slf4j
+public abstract class AbstractZkLedgerManagerFactory extends LedgerManagerFactory {
+    protected ZooKeeper zk;
+
+    @Override
+    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        try (AbstractZkLedgerManager ledgerManager = (AbstractZkLedgerManager) newLedgerManager()) {
+            String ledgersRootPath = conf.getZkLedgersRootPath();
+            List<String> children = zk.getChildren(ledgersRootPath, false);
+            for (String child : children) {
+                if (!AbstractZkLedgerManager.isSpecialZnode(child) && ledgerManager.isLedgerParentNode(child)) {
+                    ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+                }
+            }
+        }
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, layoutManager);
+    }
+
+    @Override
+    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+        AbstractZkLedgerManager zkLedgerManager = (AbstractZkLedgerManager) newLedgerManager();
+
+        /*
+         * before proceeding with nuking existing cluster, make sure there
+         * are no unexpected znodes under ledgersRootPath
+         */
+        List<String> ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if ((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
+                    && (!zkLedgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
+                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // formatting ledgermanager deletes ledger znodes
+        format(conf, layoutManager);
+
+        // now delete all the special nodes recursively
+        ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
+                ZKUtil.deleteRecursive(zk, zkLedgersRootPath + "/" + ledgersRootPathChildren);
+            } else {
+                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // finally deleting the ledgers rootpath
+        zk.delete(zkLedgersRootPath, -1);
+
+        log.info("Successfully nuked existing cluster, ZKServers: {} ledger root path: {}",
+                zkServers, zkLedgersRootPath);
+        return true;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 26f55dc9e..7ee2e2289 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -88,11 +88,6 @@ public void asyncProcessLedgers(final Processor<Long> processor,
         asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX)
-            || AbstractZkLedgerManager.isSpecialZnode(znode);
-    }
-
     @Override
     public LedgerRangeIterator getLedgerRanges() {
         return new LedgerRangeIterator() {
@@ -134,4 +129,9 @@ public synchronized LedgerRange next() throws IOException {
             }
         };
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.FLAT_LEDGER_NODE_REGEX;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index ad1c71d49..5e2ced5dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -21,24 +21,22 @@
 
 import java.io.IOException;
 import java.util.List;
+
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
  * Flat Ledger Manager Factory.
  */
-public class FlatLedgerManagerFactory extends LedgerManagerFactory {
+public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
 
     public static final String NAME = "flat";
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
-    ZooKeeper zk;
 
     @Override
     public int getCurrentVersion() {
@@ -84,19 +82,4 @@ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
-
-    @Override
-    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
-            throws InterruptedException, KeeperException, IOException {
-        String ledgersRootPath = conf.getZkLedgersRootPath();
-        List<String> children = zk.getChildren(ledgersRootPath, false);
-        for (String child : children) {
-            if (FlatLedgerManager.isSpecialZnode(child)) {
-                continue;
-            }
-            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, layoutManager);
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 07607e41a..946ed2a5d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -118,4 +118,9 @@ public LedgerRange next() throws IOException {
         }
 
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.HIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index eeeb6e98c..4f2a2bf64 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -38,4 +38,8 @@ public LedgerIdGenerator newLedgerIdGenerator() {
                 subIdGenerator, zkAcls);
     }
 
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 99df4cb62..62ff092b0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -263,4 +263,20 @@ public void format(final AbstractConfiguration<?> conf, final LayoutManager lm)
         // Create new layout information again.
         createNewLMFactory(conf, lm, factoryClass);
     }
+
+    /**
+     * This method makes sure there are no unexpected znodes under ledgersRootPath
+     * and then it proceeds with ledger metadata formatting and nuking the cluster
+     * ZK state info.
+     *
+     * @param conf
+     *          Configuration instance
+     * @param lm
+     *          Layout manager
+     * @throws IOException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public abstract boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager lm)
+            throws InterruptedException, KeeperException, IOException;
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
index ea85a8da6..59c3f2c37 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -125,7 +125,7 @@ public void asyncProcessLedgers(final Processor<Long> processor,
         asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
             @Override
             public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
-                if (isSpecialZnode(l1Node)) {
+                if (!isLedgerParentNode(l1Node)) {
                     cb1.processResult(successRc, null, context);
                     return;
                 }
@@ -147,20 +147,20 @@ public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
         }, finalCb, context, successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
-            || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.LEGACYHIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
     }
 
     @Override
     public LedgerRangeIterator getLedgerRanges() {
-        return new HierarchicalLedgerRangeIterator();
+        return new LegacyHierarchicalLedgerRangeIterator();
     }
 
     /**
      * Iterator through each metadata bucket with hierarchical mode.
      */
-    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+    private class LegacyHierarchicalLedgerRangeIterator implements LedgerRangeIterator {
         private Iterator<String> l1NodesIter = null;
         private Iterator<String> l2NodesIter = null;
         private String curL1Nodes = "";
@@ -182,7 +182,7 @@ private boolean nextL1Node() throws KeeperException, InterruptedException {
                     return false;
                 }
                 // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes)
-                if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+                if (!isLedgerParentNode(curL1Nodes)) {
                     continue;
                 }
                 List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index 70c1abdc3..cd2bce16d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -26,20 +26,17 @@
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
  * Hierarchical Ledger Manager Factory.
  */
-public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
 
     public static final String NAME = "legacyhierarchical";
     public static final int CUR_VERSION = 1;
 
     AbstractConfiguration conf;
-    ZooKeeper zk;
 
     @Override
     public int getCurrentVersion() {
@@ -79,7 +76,7 @@ public LedgerIdGenerator newLedgerIdGenerator() {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
+        return new LegacyHierarchicalLedgerManager(conf, zk);
     }
 
     @Override
@@ -87,20 +84,4 @@ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
-
-    @Override
-    public void format(AbstractConfiguration conf, LayoutManager layoutManager)
-            throws InterruptedException, KeeperException, IOException {
-        String ledgersRootPath = conf.getZkLedgersRootPath();
-        List<String> children = zk.getChildren(ledgersRootPath, false);
-        for (String child : children) {
-            if (HierarchicalLedgerManager.isSpecialZnode(child)) {
-                continue;
-            }
-            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, layoutManager);
-    }
-
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index e5bdb1c41..2e69e90a5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -102,13 +102,6 @@ public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCall
                 successRc, failureRc);
     }
 
-    protected static boolean isSpecialZnode(String znode) {
-        // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4)
-        // are at least 3 characters long. This prevents picking up any old-style
-        // hierarchical paths (2-4-4)
-        return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3;
-    }
-
     private class RecursiveProcessor implements Processor<String> {
         private final int level;
         private final String path;
@@ -130,7 +123,7 @@ private RecursiveProcessor(int level, String path, Processor<Long> processor, Ob
         @Override
         public void process(String lNode, VoidCallback cb) {
             String nodePath = path + "/" + lNode;
-            if ((level == 0) && isSpecialZnode(lNode)) {
+            if ((level == 0) && !isLedgerParentNode(lNode)) {
                 cb.processResult(successRc, null, context);
                 return;
             } else if (level < 3) {
@@ -262,7 +255,7 @@ public LedgerRange next() throws IOException {
             void advance() throws IOException {
                 while (thisLevelIterator.hasNext()) {
                     String node = thisLevelIterator.next();
-                    if (level == 0 && isSpecialZnode(node)) {
+                    if (level == 0 && !isLedgerParentNode(node)) {
                         continue;
                     }
                     LedgerRangeIterator nextIterator = level < 3
@@ -311,4 +304,9 @@ public synchronized LedgerRange next() throws IOException {
             return rootIterator.next();
         }
     }
+
+    @Override
+    protected String getLedgerParentNodeRegex() {
+        return StringUtils.LONGHIERARCHICAL_LEDGER_PARENT_NODE_REGEX;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 0e6938573..cb8e8c382 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -57,21 +58,23 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * MetaStore Based Ledger Manager Factory.
  */
+@Slf4j
 public class MSLedgerManagerFactory extends LedgerManagerFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
@@ -616,6 +619,25 @@ public LedgerRange next() throws IOException {
         public LedgerRangeIterator getLedgerRanges() {
             return new MSLedgerRangeIterator();
         }
+
+        /**
+         * Whether the znode a special znode.
+         *
+         * @param znode
+         *          Znode Name
+         * @return true  if the znode is a special znode otherwise false
+         */
+         public static boolean isSpecialZnode(String znode) {
+            if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
+                    || BookKeeperConstants.COOKIE_NODE.equals(znode)
+                    || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
+                    || BookKeeperConstants.INSTANCEID.equals(znode)
+                    || BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)
+                    || MsLedgerManager.IDGEN_ZNODE.equals(znode)) {
+                return true;
+            }
+            return false;
+        }
     }
 
     @Override
@@ -720,4 +742,45 @@ public void format(AbstractConfiguration conf, LayoutManager layoutManager)
         super.format(conf, layoutManager);
     }
 
+    @Override
+    public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager layoutManager)
+            throws InterruptedException, KeeperException, IOException {
+        String zkLedgersRootPath = conf.getZkLedgersRootPath();
+        String zkServers = conf.getZkServers();
+
+        /*
+         * before proceeding with nuking existing cluster, make sure there
+         * are no unexpected znodes under ledgersRootPath
+         */
+        List<String> ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if ((!MsLedgerManager.isSpecialZnode(ledgersRootPathChildren))) {
+                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // formatting ledgermanager deletes ledger znodes
+        format(conf, layoutManager);
+
+        // now delete all the special nodes recursively
+        ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
+        for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
+            if (MsLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
+                ZKUtil.deleteRecursive(zk, zkLedgersRootPath + "/" + ledgersRootPathChildren);
+            } else {
+                log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
+                        ledgersRootPathChildren, zkLedgersRootPath);
+                return false;
+            }
+        }
+
+        // finally deleting the ledgers rootpath
+        zk.delete(zkLedgersRootPath, -1);
+
+        log.info("Successfully nuked existing cluster, ZKServers: {} ledger root path: {}",
+                zkServers, zkLedgersRootPath);
+        return true;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index 44f557556..f55edd155 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -29,6 +29,14 @@
 
     // Ledger Node Prefix
     public static final String LEDGER_NODE_PREFIX = "L";
+    // Ledger znode in flatledgermanager layout will be "L" (prefix) +"%010d" (id in 10 digits)
+    public static final String FLAT_LEDGER_NODE_REGEX = StringUtils.LEDGER_NODE_PREFIX + "\\d{10}";
+    // top level znode in legacyhierarchicalledgermanger will be just 2 digits
+    public static final String LEGACYHIERARCHICAL_LEDGER_PARENT_NODE_REGEX = "\\d{2}";
+    // top level znode in longhierarchicalledgermanger will be just 3 digits
+    public static final String LONGHIERARCHICAL_LEDGER_PARENT_NODE_REGEX = "\\d{3}";
+    // top level znode in hierarchicalledgermanger will be just 2 digits
+    public static final String HIERARCHICAL_LEDGER_PARENT_NODE_REGEX = "\\d{2,3}";
 
     /**
      * Formats ledger ID according to ZooKeeper rules.
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 3f3e8402c..e1cb25292 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -20,16 +20,22 @@
  */
 package org.apache.bookkeeper.client;
 
+import static com.google.common.base.Charsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.net.InetAddresses;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -37,6 +43,8 @@
 import org.apache.bookkeeper.test.annotations.FlakyTest;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -235,7 +243,7 @@ public void testDecommissionForLedgersWithMultipleSegmentsAndNotWriteClosed() th
         bkAdmin.close();
     }
 
-    @Test(timeout = 6000000)
+    @Test
     public void testBookieInit() throws Exception {
         int bookieindex = 0;
         ServerConfiguration confOfExistingBookie = bsConfs.get(bookieindex);
@@ -275,4 +283,200 @@ public void testBookieInit() throws Exception {
         Assert.assertTrue("initBookie shouldn't succeeded",
                 BookKeeperAdmin.initBookie(confOfExistingBookie));
     }
+
+    @Test
+    public void testInitNewCluster() throws Exception {
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        String ledgersRootPath = "/testledgers";
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        newConfig.setZkServers(zkUtil.getZooKeeperConnectString());
+        Assert.assertTrue("New cluster should be initialized successfully", BookKeeperAdmin.initNewCluster(newConfig));
+
+        Assert.assertTrue("Cluster rootpath should have been created successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) != null));
+        String availableBookiesPath = newConfig.getZkAvailableBookiesPath();
+        Assert.assertTrue("AvailableBookiesPath should have been created successfully " + availableBookiesPath,
+                (zkc.exists(availableBookiesPath, false) != null));
+        String instanceIdPath = newConfig.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID;
+        Assert.assertTrue("InstanceId node should have been created successfully" + instanceIdPath,
+                (zkc.exists(instanceIdPath, false) != null));
+
+        String ledgersLayout = ledgersRootPath + "/" + BookKeeperConstants.LAYOUT_ZNODE;
+        Assert.assertTrue("Layout node should have been created successfully" + ledgersLayout,
+                (zkc.exists(ledgersLayout, false) != null));
+
+        /**
+         * create znodes simulating existence of Bookies in the cluster
+         */
+        int numOfBookies = 3;
+        Random rand = new Random();
+        for (int i = 0; i < numOfBookies; i++) {
+            String ipString = InetAddresses.fromInteger(rand.nextInt()).getHostAddress();
+            String regPath = newConfig.getZkAvailableBookiesPath() + "/" + ipString + ":3181";
+            zkc.create(regPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        /*
+         * now it should be possible to create ledger and delete the same
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        LedgerHandle lh = bk.createLedger(numOfBookies, numOfBookies, numOfBookies, BookKeeper.DigestType.MAC,
+                new byte[0]);
+        bk.deleteLedger(lh.ledgerId);
+        bk.close();
+    }
+
+    @Test
+    public void testNukeExistingClusterWithForceOption() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * before nuking existing cluster, bookies shouldn't be registered
+         * anymore
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+
+        Assert.assertTrue("New cluster should be nuked successfully",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, null, true));
+        Assert.assertTrue("Cluster rootpath should have been deleted successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    @Test
+    public void testNukeExistingClusterWithInstanceId() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * before nuking existing cluster, bookies shouldn't be registered
+         * anymore
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+
+        byte[] data = zkc.getData(newConfig.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID, false, null);
+        String readInstanceId = new String(data, UTF_8);
+
+        Assert.assertTrue("New cluster should be nuked successfully",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, readInstanceId, false));
+        Assert.assertTrue("Cluster rootpath should have been deleted successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    @Test
+    public void tryNukingExistingClustersWithInvalidParams() throws Exception {
+        String ledgersRootPath = "/testledgers";
+        ServerConfiguration newConfig = new ServerConfiguration(baseConf);
+        newConfig.setZkLedgersRootPath(ledgersRootPath);
+        List<String> bookiesRegPaths = new ArrayList<String>();
+        initiateNewClusterAndCreateLedgers(newConfig, bookiesRegPaths);
+
+        /*
+         * create ledger with a specific ledgerid
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        long ledgerId = 23456789L;
+        LedgerHandle lh = bk.createLedgerAdv(ledgerId, 1, 1, 1, BookKeeper.DigestType.MAC, new byte[0], null);
+        lh.close();
+
+        /*
+         * read instanceId
+         */
+        byte[] data = zkc.getData(newConfig.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID, false, null);
+        String readInstanceId = new String(data, UTF_8);
+
+        /*
+         * register a RO bookie
+         */
+        String ipString = InetAddresses.fromInteger((new Random()).nextInt()).getHostAddress();
+        String roBookieRegPath = newConfig.getZkAvailableBookiesPath() + "/" + BookKeeperConstants.READONLY + "/"
+                + ipString + ":3181";
+        zkc.create(roBookieRegPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+        Assert.assertFalse("Cluster should'nt be nuked since instanceid is not provided and force option is not set",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, null, false));
+        Assert.assertFalse("Cluster should'nt be nuked since incorrect instanceid is provided",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, "incorrectinstanceid", false));
+        Assert.assertFalse("Cluster should'nt be nuked since bookies are still registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, readInstanceId, false));
+        /*
+         * delete all rw bookies registration
+         */
+        for (int i = 0; i < bookiesRegPaths.size(); i++) {
+            zkc.delete(bookiesRegPaths.get(i), -1);
+        }
+        Assert.assertFalse("Cluster should'nt be nuked since ro bookie is still registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, readInstanceId, false));
+
+        /*
+         * make sure no node is deleted
+         */
+        Assert.assertTrue("Cluster rootpath should be existing " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) != null));
+        String availableBookiesPath = newConfig.getZkAvailableBookiesPath();
+        Assert.assertTrue("AvailableBookiesPath should be existing " + availableBookiesPath,
+                (zkc.exists(availableBookiesPath, false) != null));
+        String instanceIdPath = newConfig.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID;
+        Assert.assertTrue("InstanceId node should be existing" + instanceIdPath,
+                (zkc.exists(instanceIdPath, false) != null));
+        String ledgersLayout = ledgersRootPath + "/" + BookKeeperConstants.LAYOUT_ZNODE;
+        Assert.assertTrue("Layout node should be existing" + ledgersLayout, (zkc.exists(ledgersLayout, false) != null));
+
+        /*
+         * ledger should not be deleted.
+         */
+        lh = bk.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.MAC, new byte[0]);
+        lh.close();
+        bk.close();
+
+        /*
+         * delete ro bookie reg znode
+         */
+        zkc.delete(roBookieRegPath, -1);
+
+        Assert.assertTrue("Cluster should be nuked since no bookie is registered",
+                BookKeeperAdmin.nukeExistingCluster(newConfig, ledgersRootPath, readInstanceId, false));
+        Assert.assertTrue("Cluster rootpath should have been deleted successfully " + ledgersRootPath,
+                (zkc.exists(ledgersRootPath, false) == null));
+    }
+
+    void initiateNewClusterAndCreateLedgers(ServerConfiguration newConfig, List<String> bookiesRegPaths)
+            throws Exception {
+        newConfig.setZkServers(zkUtil.getZooKeeperConnectString());
+        Assert.assertTrue("New cluster should be initialized successfully", BookKeeperAdmin.initNewCluster(newConfig));
+
+        /**
+         * create znodes simulating existence of Bookies in the cluster
+         */
+        int numberOfBookies = 3;
+        Random rand = new Random();
+        for (int i = 0; i < numberOfBookies; i++) {
+            String ipString = InetAddresses.fromInteger(rand.nextInt()).getHostAddress();
+            bookiesRegPaths.add(newConfig.getZkAvailableBookiesPath() + "/" + ipString + ":3181");
+            zkc.create(bookiesRegPaths.get(i), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        /*
+         * now it should be possible to create ledger and delete the same
+         */
+        BookKeeper bk = new BookKeeper(new ClientConfiguration(newConfig));
+        LedgerHandle lh;
+        int numOfLedgers = 5;
+        for (int i = 0; i < numOfLedgers; i++) {
+            lh = bk.createLedger(numberOfBookies, numberOfBookies, numberOfBookies, BookKeeper.DigestType.MAC,
+                    new byte[0]);
+            lh.close();
+        }
+        bk.close();
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
index 239bc0e9d..b3aa4258a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerIteratorTest.java
@@ -30,6 +30,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
@@ -50,9 +52,10 @@
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
-
 /**
  * Test the ledger manager iterator.
  */
@@ -435,4 +438,83 @@ public void checkConcurrentModifications() throws Throwable {
             thread.join();
         }
     }
+
+    @Test
+    public void testLedgerParentNode() throws Throwable {
+        /*
+         * this testcase applies only ZK based ledgermanager so it doesnt work
+         * for MSLedgerManager
+         */
+        Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
+        AbstractZkLedgerManager lm = (AbstractZkLedgerManager) getLedgerManager();
+        List<Long> ledgerIds;
+        if (baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
+                || baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+            ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE * 10L));
+        } else {
+            ledgerIds = Arrays.asList(100L, (Integer.MAX_VALUE - 10L));
+        }
+        for (long ledgerId : ledgerIds) {
+            String fullLedgerPath = lm.getLedgerPath(ledgerId);
+            String ledgerPath = fullLedgerPath.replaceAll(baseConf.getZkLedgersRootPath() + "/", "");
+            String[] znodesOfLedger = ledgerPath.split("/");
+            Assert.assertTrue(znodesOfLedger[0] + " is supposed to be valid parent ",
+                    lm.isLedgerParentNode(znodesOfLedger[0]));
+        }
+    }
+
+    @Test
+    public void testLedgerManagerFormat() throws Throwable {
+        /*
+         * this testcase applies only ZK based ledgermanager so it doesnt work
+         * for MSLedgerManager
+         */
+        Assume.assumeTrue(!baseConf.getLedgerManagerFactoryClass().equals(MSLedgerManagerFactory.class));
+        AbstractZkLedgerManager lm = (AbstractZkLedgerManager) getLedgerManager();
+        Collection<Long> ids = Arrays.asList(1234567890L, 2L, 32345L, 23456789L);
+        if (baseConf.getLedgerManagerFactoryClass().equals(HierarchicalLedgerManagerFactory.class)
+                || baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) {
+            ids = new ArrayList<Long>(ids);
+            ids.add(Integer.MAX_VALUE * 2L);
+            ids.add(1234567891234L);
+        }
+        for (Long id : ids) {
+            createLedger(lm, id, Optional.of(BKException.Code.OK));
+        }
+
+        // create some invalid nodes under zkLedgersRootPath
+        Collection<String> invalidZnodes = Arrays.asList("12345", "12345678901L", "abc", "123d");
+        for (String invalidZnode : invalidZnodes) {
+            ZkUtils.createFullPathOptimistic(zkc, baseConf.getZkLedgersRootPath() + "/" + invalidZnode,
+                    "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        /*
+         * get the count of total children under zkLedgersRootPath and also
+         * count of the parent nodes of ledgers under zkLedgersRootPath
+         */
+        List<String> childrenOfLedgersRootPath = zkc.getChildren(baseConf.getZkLedgersRootPath(), false);
+        int totalChildrenOfLedgersRootPath = childrenOfLedgersRootPath.size();
+        int totalParentNodesOfLedgers = 0;
+        for (String childOfLedgersRootPath : childrenOfLedgersRootPath) {
+            if (lm.isLedgerParentNode(childOfLedgersRootPath)) {
+                totalParentNodesOfLedgers++;
+            }
+        }
+
+        /*
+         * after ledgermanagerfactory format only the znodes of created ledgers
+         * under zkLedgersRootPath should be deleted recursively but not
+         * specialnode or invalid nodes created above
+         */
+        ledgerManagerFactory.format(baseConf,
+                new ZkLayoutManager(zkc, baseConf.getZkLedgersRootPath(), ZkUtils.getACLs(baseConf)));
+        List<String> childrenOfLedgersRootPathAfterFormat = zkc.getChildren(baseConf.getZkLedgersRootPath(), false);
+        int totalChildrenOfLedgersRootPathAfterFormat = childrenOfLedgersRootPathAfterFormat.size();
+        Assert.assertEquals("totalChildrenOfLedgersRootPathAfterFormat",
+                totalChildrenOfLedgersRootPath - totalParentNodesOfLedgers, totalChildrenOfLedgersRootPathAfterFormat);
+
+        Assert.assertTrue("ChildrenOfLedgersRootPathAfterFormat should contain all the invalid znodes created",
+                childrenOfLedgersRootPathAfterFormat.containsAll(invalidZnodes));
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services