You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/16 05:27:58 UTC

[GitHub] sijie closed pull request #1251: BP-29 (task 4): use metadata service uri for constructing registration manager and ledger manager factory

sijie closed pull request #1251: BP-29 (task 4): use metadata service uri for constructing registration manager and ledger manager factory
URL: https://github.com/apache/bookkeeper/pull/1251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 7b28cdd31..b65df4758 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -43,6 +43,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.file.FileStore;
@@ -69,9 +70,11 @@
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
@@ -83,7 +86,6 @@
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -123,8 +125,7 @@
     LedgerDirsMonitor idxMonitor;
 
     // Registration Manager for managing registration
-    RegistrationManager registrationManager;
-
+    protected final MetadataBookieDriver metadataDriver;
 
     private int exitCode = ExitCode.OK;
 
@@ -225,30 +226,20 @@ public boolean accept(File dir, String name) {
         }
     }
 
-    @VisibleForTesting
-    public synchronized void setRegistrationManager(RegistrationManager rm) {
-            this.registrationManager = rm;
-            this.getStateManager().setRegistrationManager(rm);
-    }
-
-    @VisibleForTesting
-    public synchronized RegistrationManager getRegistrationManager() {
-        return this.registrationManager;
-    }
-
     /**
      * Check that the environment for the bookie is correct.
      * This means that the configuration has stayed the same as the
      * first run and the filesystem structure is up to date.
      */
-    private void checkEnvironment(RegistrationManager rm) throws BookieException, IOException {
+    private void checkEnvironment(MetadataBookieDriver metadataDriver)
+            throws BookieException, IOException {
         List<File> allLedgerDirs = new ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
                                                      + indexDirsManager.getAllLedgerDirs().size());
         allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
         if (indexDirsManager != ledgerDirsManager) {
             allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
         }
-        if (rm == null) { // exists only for testing, just make sure directories are correct
+        if (metadataDriver == null) { // exists only for testing, just make sure directories are correct
 
             for (File journalDirectory : journalDirectories) {
                 checkDirectoryStructure(journalDirectory);
@@ -260,7 +251,7 @@ private void checkEnvironment(RegistrationManager rm) throws BookieException, IO
             return;
         }
 
-        checkEnvironmentWithStorageExpansion(conf, rm, journalDirectories, allLedgerDirs);
+        checkEnvironmentWithStorageExpansion(conf, metadataDriver, journalDirectories, allLedgerDirs);
 
         checkIfDirsOnSameDiskPartition(allLedgerDirs);
         checkIfDirsOnSameDiskPartition(journalDirectories);
@@ -400,9 +391,10 @@ private static void stampNewCookie(ServerConfiguration conf,
 
     public static void checkEnvironmentWithStorageExpansion(
             ServerConfiguration conf,
-            RegistrationManager rm,
+            MetadataBookieDriver metadataDriver,
             List<File> journalDirectories,
             List<File> allLedgerDirs) throws BookieException {
+        RegistrationManager rm = metadataDriver.getRegistrationManager();
         try {
             // 1. retrieve the instance id
             String instanceId = rm.getClusterInstanceId();
@@ -629,24 +621,22 @@ public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
         }
 
         // instantiate zookeeper client to initialize ledger manager
-        this.registrationManager = instantiateRegistrationManager(conf);
-        checkEnvironment(this.registrationManager);
+        this.metadataDriver = instantiateMetadataDriver(conf);
+        checkEnvironment(this.metadataDriver);
         try {
-            if (registrationManager != null) {
+            if (this.metadataDriver != null) {
                 // current the registration manager is zookeeper only
-                ledgerManagerFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                    conf,
-                    registrationManager.getLayoutManager());
+                ledgerManagerFactory = metadataDriver.getLedgerManagerFactory();
                 LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
                 ledgerManager = ledgerManagerFactory.newLedgerManager();
             } else {
                 ledgerManagerFactory = null;
                 ledgerManager = null;
             }
-        } catch (IOException | InterruptedException e) {
+        } catch (MetadataException e) {
             throw new MetadataStoreException("Failed to initialize ledger manager", e);
         }
-        stateManager = new BookieStateManager(conf, statsLogger, registrationManager, ledgerDirsManager);
+        stateManager = new BookieStateManager(conf, statsLogger, metadataDriver, ledgerDirsManager);
         // register shutdown handler using trigger mode
         stateManager.setShutdownHandler(exitCode -> triggerBookieShutdown(exitCode));
         // Initialise ledgerDirMonitor. This would look through all the
@@ -898,23 +888,31 @@ public void diskJustWritable(File disk) {
     }
 
     /**
-     * Instantiate the registration manager for the Bookie.
+     * Instantiate the metadata driver for the Bookie.
      */
-    private RegistrationManager instantiateRegistrationManager(ServerConfiguration conf) throws BookieException {
-        // Create the registration manager instance
-        Class<? extends RegistrationManager> managerCls;
+    private MetadataBookieDriver instantiateMetadataDriver(ServerConfiguration conf) throws BookieException {
         try {
-            managerCls = conf.getRegistrationManagerClass();
+            String metadataServiceUriStr = conf.getMetadataServiceUri();
+            if (null == metadataServiceUriStr) {
+                return null;
+            }
+
+            MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+                URI.create(metadataServiceUriStr));
+            driver.initialize(
+                conf,
+                () -> {
+                    stateManager.forceToUnregistered();
+                    // schedule a re-register operation
+                    stateManager.registerBookie(false);
+                },
+                statsLogger);
+            return driver;
+        } catch (MetadataException me) {
+            throw new MetadataStoreException("Failed to initialize metadata bookie driver", me);
         } catch (ConfigurationException e) {
             throw new BookieIllegalOpException(e);
         }
-
-        RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
-        return manager.initialize(conf, () -> {
-            stateManager.forceToUnregistered();
-            // schedule a re-register operation
-            stateManager.registerBookie(false);
-        }, statsLogger);
     }
 
     /*
@@ -1032,8 +1030,8 @@ synchronized int shutdown(int exitCode) {
 
             }
             // Shutdown the ZK client
-            if (registrationManager != null) {
-                registrationManager.close();
+            if (metadataDriver != null) {
+                metadataDriver.close();
             }
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 027a04dda..e4dcdd318 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -19,12 +19,16 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithMetadataBookieDriver;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
 import static org.apache.bookkeeper.tools.cli.helpers.CommandHelpers.getBookieSocketAddrStringRepresentation;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -75,6 +79,7 @@
 import org.apache.bookkeeper.bookie.storage.ldb.EntryLocationIndex;
 import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.MetaStoreException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -86,18 +91,17 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
 import org.apache.bookkeeper.tools.cli.commands.client.SimpleTestCommand;
@@ -386,7 +390,7 @@ public BookieFormatCmd() {
             opts.addOption("f", "force", false,
                     "If [nonInteractive] is specified, then whether"
                             + " to force delete the old data without prompt..?");
-            opts.addOption("d", "deleteCookie", false, "Delete its cookie on zookeeper");
+            opts.addOption("d", "deleteCookie", false, "Delete its cookie on metadata store");
         }
 
         @Override
@@ -413,17 +417,17 @@ int runCmd(CommandLine cmdLine) throws Exception {
             boolean result = Bookie.format(conf, interactive, force);
             // delete cookie
             if (cmdLine.hasOption("d")) {
-                RegistrationManager rm = new ZKRegistrationManager();
-                rm.initialize(conf, () -> {
-                }, NullStatsLogger.INSTANCE);
-                try {
-                    Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
-                    cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
-                } catch (CookieNotFoundException nne) {
-                    LOG.warn("No cookie to remove : ", nne);
-                } finally {
-                    rm.close();
-                }
+                runFunctionWithRegistrationManager(bkConf, rm -> {
+                    try {
+                        Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+                        cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+                    } catch (CookieNotFoundException nne) {
+                        LOG.warn("No cookie to remove : ", nne);
+                    } catch (BookieException be) {
+                        throw new UncheckedExecutionException(be.getMessage(), be);
+                    }
+                    return null;
+                });
             }
             return (result) ? 0 : 1;
         }
@@ -623,15 +627,29 @@ private int bkRecovery(BookKeeperAdmin bkAdmin,
         private void deleteCookies(ClientConfiguration conf,
                                    Set<BookieSocketAddress> bookieAddrs) throws BKException {
             ServerConfiguration serverConf = new ServerConfiguration(conf);
-            try (RegistrationManager rm = new ZKRegistrationManager()) {
-                rm.initialize(serverConf, () -> {}, NullStatsLogger.INSTANCE);
-                for (BookieSocketAddress addr : bookieAddrs) {
-                    deleteCookie(rm, addr);
+            try {
+                runFunctionWithRegistrationManager(serverConf, rm -> {
+                    try {
+                        for (BookieSocketAddress addr : bookieAddrs) {
+                            deleteCookie(rm, addr);
+                        }
+                    } catch (Exception e) {
+                        throw new UncheckedExecutionException(e);
+                    }
+                    return null;
+                });
+            } catch (Exception e) {
+                Throwable cause = e;
+                if (e instanceof UncheckedExecutionException) {
+                    cause = e.getCause();
+                }
+                if (cause instanceof BKException) {
+                    throw (BKException) cause;
+                } else {
+                    BKException bke = new MetaStoreException();
+                    bke.initCause(bke);
+                    throw bke;
                 }
-            } catch (BookieException be) {
-                BKException bke = new BKException.MetaStoreException();
-                bke.initCause(be);
-                throw bke;
             }
         }
 
@@ -881,7 +899,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
             final String includingBookieId = cmdLine.getOptionValue("missingreplica");
             final String excludingBookieId = cmdLine.getOptionValue("excludingmissingreplica");
 
-            Predicate<List<String>> predicate = null;
+            final Predicate<List<String>> predicate;
             if (!StringUtils.isBlank(includingBookieId) && !StringUtils.isBlank(excludingBookieId)) {
                 predicate = replicasList -> (replicasList.contains(includingBookieId)
                         && !replicasList.contains(excludingBookieId));
@@ -889,19 +907,26 @@ int runCmd(CommandLine cmdLine) throws Exception {
                 predicate = replicasList -> replicasList.contains(includingBookieId);
             } else if (!StringUtils.isBlank(excludingBookieId)) {
                 predicate = replicasList -> !replicasList.contains(excludingBookieId);
+            } else {
+                predicate = null;
             }
 
-            try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(bkConf)) {
-                try (LedgerManagerFactory mFactory =
-                         AbstractZkLedgerManagerFactory.newLedgerManagerFactory(bkConf, rm.getLayoutManager())) {
-                    LedgerUnderreplicationManager underreplicationManager =
-                        mFactory.newLedgerUnderreplicationManager();
-                    Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate(predicate);
-                    while (iter.hasNext()) {
-                        System.out.println(ledgerIdFormatter.formatLedgerId(iter.next()));
-                    }
+            runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
+                LedgerUnderreplicationManager underreplicationManager;
+                try {
+                    underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+                } catch (KeeperException | CompatibilityException e) {
+                    throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e);
                 }
-            }
+                Iterator<Long> iter = underreplicationManager.listLedgersToRereplicate(predicate);
+                while (iter.hasNext()) {
+                    System.out.println(ledgerIdFormatter.formatLedgerId(iter.next()));
+                }
+                return null;
+            });
 
             return 0;
         }
@@ -923,9 +948,7 @@ int runCmd(CommandLine cmdLine) throws Exception {
 
         @Override
         public int runCmd(CommandLine cmdLine) throws Exception {
-            try (LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                    bkConf,
-                    RegistrationManager.instantiateRegistrationManager(bkConf).getLayoutManager())) {
+            runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
                 try (LedgerManager m = mFactory.newLedgerManager()) {
                     LedgerRangeIterator iter = m.getLedgerRanges();
                     if (cmdLine.hasOption("m")) {
@@ -956,8 +979,11 @@ public int runCmd(CommandLine cmdLine) throws Exception {
                             }
                         }
                     }
+                } catch (Exception ioe) {
+                    throw new UncheckedExecutionException(ioe);
                 }
-            }
+                return null;
+            });
 
             return 0;
         }
@@ -1024,16 +1050,16 @@ public int runCmd(CommandLine cmdLine) throws Exception {
                 return -1;
             }
 
-            try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(bkConf)) {
-                try (LedgerManagerFactory mFactory =
-                         AbstractZkLedgerManagerFactory.newLedgerManagerFactory(bkConf, rm.getLayoutManager())){
-                    try (LedgerManager m = mFactory.newLedgerManager()) {
-                        ReadMetadataCallback cb = new ReadMetadataCallback(lid);
-                        m.readLedgerMetadata(lid, cb);
-                        printLedgerMetadata(cb);
-                    }
+            runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
+                try (LedgerManager m = mFactory.newLedgerManager()) {
+                    ReadMetadataCallback cb = new ReadMetadataCallback(lid);
+                    m.readLedgerMetadata(lid, cb);
+                    printLedgerMetadata(cb);
+                } catch (Exception e) {
+                    throw new UncheckedExecutionException(e);
                 }
-            }
+                return null;
+            });
 
             return 0;
         }
@@ -1647,39 +1673,44 @@ int runCmd(CommandLine cmdLine) throws Exception {
                 return 1;
             }
 
-            try (RegistrationManager rm =
-                     RegistrationManager.instantiateRegistrationManager(bkConf)){
-                try (LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                    bkConf,
-                    rm.getLayoutManager())) {
-                    LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
-                    if (!enable && !disable) {
-                        boolean enabled = underreplicationManager.isLedgerReplicationEnabled();
-                        System.out.println("Autorecovery is " + (enabled ? "enabled." : "disabled."));
-                    } else if (enable) {
-                        if (underreplicationManager.isLedgerReplicationEnabled()) {
-                            LOG.warn("Autorecovery already enabled. Doing nothing");
-                        } else {
-                            LOG.info("Enabling autorecovery");
-                            underreplicationManager.enableLedgerReplication();
-                        }
-                    } else {
-                        if (!underreplicationManager.isLedgerReplicationEnabled()) {
-                            LOG.warn("Autorecovery already disabled. Doing nothing");
+            runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
+                try {
+                    try (LedgerUnderreplicationManager underreplicationManager =
+                             mFactory.newLedgerUnderreplicationManager()) {
+                        if (!enable && !disable) {
+                            boolean enabled = underreplicationManager.isLedgerReplicationEnabled();
+                            System.out.println("Autorecovery is " + (enabled ? "enabled." : "disabled."));
+                        } else if (enable) {
+                            if (underreplicationManager.isLedgerReplicationEnabled()) {
+                                LOG.warn("Autorecovery already enabled. Doing nothing");
+                            } else {
+                                LOG.info("Enabling autorecovery");
+                                underreplicationManager.enableLedgerReplication();
+                            }
                         } else {
-                            LOG.info("Disabling autorecovery");
-                            underreplicationManager.disableLedgerReplication();
+                            if (!underreplicationManager.isLedgerReplicationEnabled()) {
+                                LOG.warn("Autorecovery already disabled. Doing nothing");
+                            } else {
+                                LOG.info("Disabling autorecovery");
+                                underreplicationManager.disableLedgerReplication();
+                            }
                         }
                     }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new UncheckedExecutionException(e);
+                } catch (KeeperException | ReplicationException e) {
+                    throw new UncheckedExecutionException(e);
                 }
-            }
+                return null;
+            });
 
             return 0;
         }
     }
 
     /**
-     * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper.
+     * Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store.
      */
     class LostBookieRecoveryDelayCmd extends MyCommand {
         Options opts = new Options();
@@ -1697,7 +1728,7 @@ Options getOptions() {
 
         @Override
         String getDescription() {
-            return "Setter and Getter for LostBookieRecoveryDelay value (in seconds) in Zookeeper.";
+            return "Setter and Getter for LostBookieRecoveryDelay value (in seconds) in metadata store.";
         }
 
         @Override
@@ -1813,11 +1844,17 @@ String getUsage() {
 
         @Override
         int runCmd(CommandLine cmdLine) throws Exception {
-            try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(bkConf)) {
-                String readInstanceId = rm.getClusterInstanceId();
+            runFunctionWithRegistrationManager(bkConf, rm -> {
+                String readInstanceId = null;
+                try {
+                    readInstanceId = rm.getClusterInstanceId();
+                } catch (BookieException e) {
+                    throw new UncheckedExecutionException(e);
+                }
                 LOG.info("ZKServers: {} ZkLedgersRootPath: {} InstanceId: {}", bkConf.getZkServers(),
                         bkConf.getZkLedgersRootPath(), readInstanceId);
-            }
+                return null;
+            });
             return 0;
         }
     }
@@ -1918,87 +1955,80 @@ int runCmd(CommandLine cmdLine) throws Exception {
             return retValue;
         }
 
-        private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws BookieException,
-                InterruptedException {
-            RegistrationManager rm = new ZKRegistrationManager();
-            try {
-                rm.initialize(bkConf, () -> {
-                }, NullStatsLogger.INSTANCE);
-                ServerConfiguration conf = new ServerConfiguration(bkConf);
-                String newBookieId = Bookie.getBookieAddress(conf).toString();
-                // read oldcookie
-                Versioned<Cookie> oldCookie = null;
+        private int updateBookieIdInCookie(final String bookieId, final boolean useHostname)
+                throws Exception {
+            return runFunctionWithRegistrationManager(bkConf, rm -> {
                 try {
-                    conf.setUseHostNameAsBookieID(!useHostname);
-                    oldCookie = Cookie.readFromRegistrationManager(rm, conf);
-                } catch (CookieNotFoundException nne) {
-                    LOG.error("Either cookie already updated with UseHostNameAsBookieID={} or no cookie exists!",
-                            useHostname, nne);
-                    return -1;
-                }
-                Cookie newCookie = Cookie.newBuilder(oldCookie.getValue()).setBookieHost(newBookieId).build();
-                boolean hasCookieUpdatedInDirs = verifyCookie(newCookie, journalDirectories[0]);
-                for (File dir : ledgerDirectories) {
-                    hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
-                }
-                if (indexDirectories != ledgerDirectories) {
-                    for (File dir : indexDirectories) {
-                        hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
-                    }
-                }
-
-                if (hasCookieUpdatedInDirs) {
+                    ServerConfiguration conf = new ServerConfiguration(bkConf);
+                    String newBookieId = Bookie.getBookieAddress(conf).toString();
+                    // read oldcookie
+                    Versioned<Cookie> oldCookie = null;
                     try {
-                        conf.setUseHostNameAsBookieID(useHostname);
-                        Cookie.readFromRegistrationManager(rm, conf);
-                        // since newcookie exists, just do cleanup of oldcookie and return
                         conf.setUseHostNameAsBookieID(!useHostname);
-                        oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
-                        return 0;
+                        oldCookie = Cookie.readFromRegistrationManager(rm, conf);
                     } catch (CookieNotFoundException nne) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Ignoring, cookie will be written to zookeeper");
-                        }
-                    }
-                } else {
-                    // writes newcookie to local dirs
-                    for (File journalDirectory : journalDirectories) {
-                        newCookie.writeToDirectory(journalDirectory);
-                        LOG.info("Updated cookie file present in journalDirectory {}", journalDirectory);
+                        LOG.error("Either cookie already updated with UseHostNameAsBookieID={} or no cookie exists!",
+                            useHostname, nne);
+                        return -1;
                     }
+                    Cookie newCookie = Cookie.newBuilder(oldCookie.getValue()).setBookieHost(newBookieId).build();
+                    boolean hasCookieUpdatedInDirs = verifyCookie(newCookie, journalDirectories[0]);
                     for (File dir : ledgerDirectories) {
-                        newCookie.writeToDirectory(dir);
+                        hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
                     }
-                    LOG.info("Updated cookie file present in ledgerDirectories {}", ledgerDirectories);
-                    if (ledgerDirectories != indexDirectories) {
+                    if (indexDirectories != ledgerDirectories) {
                         for (File dir : indexDirectories) {
+                            hasCookieUpdatedInDirs &= verifyCookie(newCookie, dir);
+                        }
+                    }
+
+                    if (hasCookieUpdatedInDirs) {
+                        try {
+                            conf.setUseHostNameAsBookieID(useHostname);
+                            Cookie.readFromRegistrationManager(rm, conf);
+                            // since newcookie exists, just do cleanup of oldcookie and return
+                            conf.setUseHostNameAsBookieID(!useHostname);
+                            oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
+                            return 0;
+                        } catch (CookieNotFoundException nne) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Ignoring, cookie will be written to zookeeper");
+                            }
+                        }
+                    } else {
+                        // writes newcookie to local dirs
+                        for (File journalDirectory : journalDirectories) {
+                            newCookie.writeToDirectory(journalDirectory);
+                            LOG.info("Updated cookie file present in journalDirectory {}", journalDirectory);
+                        }
+                        for (File dir : ledgerDirectories) {
                             newCookie.writeToDirectory(dir);
                         }
-                        LOG.info("Updated cookie file present in indexDirectories {}", indexDirectories);
+                        LOG.info("Updated cookie file present in ledgerDirectories {}", ledgerDirectories);
+                        if (ledgerDirectories != indexDirectories) {
+                            for (File dir : indexDirectories) {
+                                newCookie.writeToDirectory(dir);
+                            }
+                            LOG.info("Updated cookie file present in indexDirectories {}", indexDirectories);
+                        }
                     }
-                }
-                // writes newcookie to zookeeper
-                conf.setUseHostNameAsBookieID(useHostname);
-                newCookie.writeToRegistrationManager(rm, conf, Version.NEW);
+                    // writes newcookie to zookeeper
+                    conf.setUseHostNameAsBookieID(useHostname);
+                    newCookie.writeToRegistrationManager(rm, conf, Version.NEW);
 
-                // delete oldcookie
-                conf.setUseHostNameAsBookieID(!useHostname);
-                oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
-            } catch (IOException ioe) {
-                LOG.error("IOException during cookie updation!", ioe);
-                return -1;
-            } finally {
-                if (rm != null) {
-                    rm.close();
+                    // delete oldcookie
+                    conf.setUseHostNameAsBookieID(!useHostname);
+                    oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
+                    return 0;
+                } catch (IOException | BookieException ioe) {
+                    LOG.error("IOException during cookie updation!", ioe);
+                    return -1;
                 }
-            }
-            return 0;
+            });
         }
 
-        private int expandStorage() throws InterruptedException {
-            try (RegistrationManager rm = new ZKRegistrationManager()) {
-                rm.initialize(bkConf, () -> { }, NullStatsLogger.INSTANCE);
-
+        private int expandStorage() throws Exception {
+            return runFunctionWithMetadataBookieDriver(bkConf, driver -> {
                 List<File> allLedgerDirs = Lists.newArrayList();
                 allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
                 if (indexDirectories != ledgerDirectories) {
@@ -2007,16 +2037,13 @@ private int expandStorage() throws InterruptedException {
 
                 try {
                     Bookie.checkEnvironmentWithStorageExpansion(
-                            bkConf, rm, Arrays.asList(journalDirectories), allLedgerDirs);
+                        bkConf, driver, Arrays.asList(journalDirectories), allLedgerDirs);
+                    return 0;
                 } catch (BookieException e) {
                     LOG.error("Exception while updating cookie for storage expansion", e);
                     return -1;
                 }
-            } catch (BookieException e) {
-                LOG.error("Exception while establishing RegistrationManager connection.", e);
-                return -1;
-            }
-            return 0;
+            });
         }
 
         private boolean verifyCookie(Cookie oldCookie, File dir) throws IOException {
@@ -2029,8 +2056,7 @@ private boolean verifyCookie(Cookie oldCookie, File dir) throws IOException {
             return true;
         }
 
-        private int listOrDeleteCookies(boolean delete, boolean force)
-                throws IOException, BookieException {
+        private int listOrDeleteCookies(boolean delete, boolean force) throws Exception {
             BookieSocketAddress bookieAddress = Bookie.getBookieAddress(bkConf);
             File[] journalDirs = bkConf.getJournalDirs();
             File[] ledgerDirs = bkConf.getLedgerDirs();
@@ -2079,34 +2105,35 @@ private int listOrDeleteCookies(boolean delete, boolean force)
                 LOG.info("No local cookies for Bookie: {}", bookieAddress);
             }
 
-            try (ZKRegistrationManager rm = new ZKRegistrationManager()) {
-                Versioned<Cookie> cookie = null;
+            return runFunctionWithRegistrationManager(bkConf, rm -> {
                 try {
-                    rm.initialize(bkConf, () -> { }, NullStatsLogger.INSTANCE);
-                    cookie = Cookie.readFromRegistrationManager(rm, bookieAddress);
-                } catch (CookieNotFoundException nne) {
-                    LOG.info("No cookie for {} in ZooKeeper", bookieAddress);
-                    return 0;
-                }
-
-                if (delete) {
-                    boolean confirm = force;
-                    if (!confirm) {
-                        confirm = IOUtils.confirmPrompt("Are you sure you want to delete Cookies from Zookeeper?");
+                    Versioned<Cookie> cookie = null;
+                    try {
+                        cookie = Cookie.readFromRegistrationManager(rm, bookieAddress);
+                    } catch (CookieNotFoundException nne) {
+                        LOG.info("No cookie for {} in metadata store", bookieAddress);
+                        return 0;
                     }
 
-                    if (confirm) {
-                        cookie.getValue().deleteFromRegistrationManager(rm, bkConf, cookie.getVersion());
-                        LOG.info("Deleted Cookie from Zookeeper for Bookie: {}", bookieAddress);
-                    } else {
-                        LOG.info("Skipping deleting cookie from ZooKeeper for Bookie: {}", bookieAddress);
+                    if (delete) {
+                        boolean confirm = force;
+                        if (!confirm) {
+                            confirm = IOUtils.confirmPrompt(
+                                "Are you sure you want to delete Cookies from metadata store?");
+                        }
+
+                        if (confirm) {
+                            cookie.getValue().deleteFromRegistrationManager(rm, bkConf, cookie.getVersion());
+                            LOG.info("Deleted Cookie from metadata store for Bookie: {}", bookieAddress);
+                        } else {
+                            LOG.info("Skipping deleting cookie from metadata store for Bookie: {}", bookieAddress);
+                        }
                     }
-                } else {
-                    LOG.info("{} bookie's Cookie path in Zookeeper: {} ", bookieAddress,
-                            rm.getCookiePath(bookieAddress.toString()));
+                } catch (BookieException | IOException e) {
+                    return -1;
                 }
-            }
-            return 0;
+                return 0;
+            });
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
index 093dcad0c..798db41d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieStateManager.java
@@ -32,8 +32,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -65,16 +64,16 @@
 
     private final String bookieId;
     private ShutdownHandler shutdownHandler;
-    private RegistrationManager registrationManager;
+    private final MetadataBookieDriver metadataDriver;
     // Expose Stats
     private final StatsLogger statsLogger;
 
 
     public BookieStateManager(ServerConfiguration conf, StatsLogger statsLogger,
-           RegistrationManager registrationManager, LedgerDirsManager ledgerDirsManager) throws IOException {
+           MetadataBookieDriver metadataDriver, LedgerDirsManager ledgerDirsManager) throws IOException {
         this.conf = conf;
         this.statsLogger = statsLogger;
-        this.registrationManager = registrationManager;
+        this.metadataDriver = metadataDriver;
         this.ledgerDirsManager = ledgerDirsManager;
         // ZK ephemeral node for this Bookie.
         this.bookieId = getMyId();
@@ -99,8 +98,8 @@ public Number getSample() {
     }
 
     @VisibleForTesting
-    BookieStateManager(ServerConfiguration conf, RegistrationManager registrationManager) throws IOException {
-        this(conf, NullStatsLogger.INSTANCE, registrationManager, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+    BookieStateManager(ServerConfiguration conf, MetadataBookieDriver metadataDriver) throws IOException {
+        this(conf, NullStatsLogger.INSTANCE, metadataDriver, new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),
                 NullStatsLogger.INSTANCE));
     }
@@ -199,16 +198,15 @@ void doRegisterBookie() throws IOException {
     }
 
     private void doRegisterBookie(boolean isReadOnly) throws IOException {
-        if (null == registrationManager || ((ZKRegistrationManager) this.registrationManager).getZk() == null) {
-            // registration manager is null, means not register itself to zk.
-            // ZooKeeper is null existing only for testing.
-            LOG.info("null zk while do register");
+        if (null == metadataDriver) {
+            // registration manager is null, means not register itself to metadata store.
+            LOG.info("null registration manager while do register");
             return;
         }
 
         rmRegistered.set(false);
         try {
-            registrationManager.registerBookie(bookieId, isReadOnly);
+            metadataDriver.getRegistrationManager().registerBookie(bookieId, isReadOnly);
             rmRegistered.set(true);
         } catch (BookieException e) {
             throw new IOException(e);
@@ -230,7 +228,7 @@ public void doTransitionToWritableMode() {
             bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
         }
         // change zookeeper state only when using zookeeper
-        if (null == registrationManager) {
+        if (null == metadataDriver) {
             return;
         }
         try {
@@ -242,7 +240,7 @@ public void doTransitionToWritableMode() {
         }
         // clear the readonly state
         try {
-            registrationManager.unregisterBookie(bookieId, true);
+            metadataDriver.getRegistrationManager().unregisterBookie(bookieId, true);
         } catch (BookieException e) {
             // if we failed when deleting the readonly flag in zookeeper, it is OK since client would
             // already see the bookie in writable list. so just log the exception
@@ -274,11 +272,11 @@ public void doTransitionToReadOnlyMode() {
             this.bookieStatus.writeToDirectories(ledgerDirsManager.getAllLedgerDirs());
         }
         // change zookeeper state only when using zookeeper
-        if (null == registrationManager) {
+        if (null == metadataDriver) {
             return;
         }
         try {
-            registrationManager.registerBookie(bookieId, true);
+            metadataDriver.getRegistrationManager().registerBookie(bookieId, true);
         } catch (BookieException e) {
             LOG.error("Error in transition to ReadOnly Mode."
                     + " Shutting down", e);
@@ -294,10 +292,6 @@ private String getMyId() throws UnknownHostException {
         return Bookie.getBookieAddress(conf).toString();
     }
 
-    @VisibleForTesting
-    public void setRegistrationManager(RegistrationManager rm) {
-        this.registrationManager = rm;
-    }
     @VisibleForTesting
     public ShutdownHandler getShutdownHandler(){
         return shutdownHandler;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index d499ce357..311988b5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -22,8 +22,10 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -35,12 +37,13 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
+import org.apache.bookkeeper.bookie.BookieException.UpgradeException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.HardLink;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.cli.BasicParser;
@@ -124,18 +127,6 @@ private static int detectPreviousVersion(File directory) throws IOException {
         }
     }
 
-    private static RegistrationManager newRegistrationManager(final ServerConfiguration conf)
-            throws BookieException.UpgradeException {
-
-        try {
-            Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
-            RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
-            return rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        } catch (Exception e) {
-            throw new BookieException.UpgradeException(e);
-        }
-    }
-
     private static void linkIndexDirectories(File srcPath, File targetPath) throws IOException {
         String[] files = srcPath.list();
         if (files == null) {
@@ -166,7 +157,27 @@ public static void upgrade(ServerConfiguration conf)
             throws BookieException.UpgradeException, InterruptedException {
         LOG.info("Upgrading...");
 
-        try (RegistrationManager rm = newRegistrationManager(conf)) {
+        try {
+            runFunctionWithRegistrationManager(conf, rm -> {
+                try {
+                    upgrade(conf, rm);
+                } catch (UpgradeException e) {
+                    throw new UncheckedExecutionException(e.getMessage(), e);
+                }
+                return null;
+            });
+        } catch (MetadataException e) {
+            throw new UpgradeException(e);
+        } catch (ExecutionException e) {
+            throw new UpgradeException(e.getCause());
+        }
+
+        LOG.info("Done");
+    }
+
+    private static void upgrade(ServerConfiguration conf,
+                                RegistrationManager rm) throws UpgradeException {
+        try {
             Map<File, File> deferredMoves = new HashMap<File, File>();
             Cookie.Builder cookieBuilder = Cookie.generateCookie(conf);
             Cookie c = cookieBuilder.build();
@@ -225,7 +236,6 @@ public boolean accept(File dir, String name) {
         } catch (IOException ioe) {
             throw new BookieException.UpgradeException(ioe);
         }
-        LOG.info("Done");
     }
 
     public static void finalizeUpgrade(ServerConfiguration conf)
@@ -269,35 +279,54 @@ public static void finalizeUpgrade(ServerConfiguration conf)
     public static void rollback(ServerConfiguration conf)
             throws BookieException.UpgradeException, InterruptedException {
         LOG.info("Rolling back upgrade...");
-        try (RegistrationManager rm = newRegistrationManager(conf)) {
-            for (File d : getAllDirectories(conf)) {
-                LOG.info("Rolling back {}", d);
+
+        try {
+            runFunctionWithRegistrationManager(conf, rm -> {
                 try {
-                    // ensure there is a previous version before rollback
-                    int version = detectPreviousVersion(d);
-
-                    if (version <= Cookie.CURRENT_COOKIE_LAYOUT_VERSION) {
-                        File curDir = new File(d,
-                                BookKeeperConstants.CURRENT_DIR);
-                        FileUtils.deleteDirectory(curDir);
-                    } else {
-                        throw new BookieException.UpgradeException(
-                                "Cannot rollback as previous data does not exist");
-                    }
-                } catch (IOException ioe) {
-                    LOG.error("Error rolling back {}", d);
-                    throw new BookieException.UpgradeException(ioe);
+                    rollback(conf, rm);
+                } catch (UpgradeException e) {
+                    throw new UncheckedExecutionException(e.getMessage(), e);
                 }
-            }
+                return null;
+            });
+        } catch (MetadataException e) {
+            throw new UpgradeException(e);
+        } catch (ExecutionException e) {
+            throw new UpgradeException(e.getCause());
+        }
+
+        LOG.info("Done");
+    }
+
+    private static void rollback(ServerConfiguration conf,
+                                 RegistrationManager rm)
+            throws BookieException.UpgradeException {
+        for (File d : getAllDirectories(conf)) {
+            LOG.info("Rolling back {}", d);
             try {
-                Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
-                cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
-            } catch (BookieException ke) {
-                LOG.error("Error deleting cookie from Registration Manager");
-                throw new BookieException.UpgradeException(ke);
+                // ensure there is a previous version before rollback
+                int version = detectPreviousVersion(d);
+
+                if (version <= Cookie.CURRENT_COOKIE_LAYOUT_VERSION) {
+                    File curDir = new File(d,
+                            BookKeeperConstants.CURRENT_DIR);
+                    FileUtils.deleteDirectory(curDir);
+                } else {
+                    throw new BookieException.UpgradeException(
+                            "Cannot rollback as previous data does not exist");
+                }
+            } catch (IOException ioe) {
+                LOG.error("Error rolling back {}", d);
+                throw new BookieException.UpgradeException(ioe);
             }
         }
-        LOG.info("Done");
+        try {
+            Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+            cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+        } catch (BookieException ke) {
+            LOG.error("Error deleting cookie from Registration Manager");
+            throw new BookieException.UpgradeException(ke);
+        }
     }
 
     private static void printHelp(Options opts) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
index 464f1272f..4bd36640f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyBookie.java
@@ -41,7 +41,7 @@
     public ReadOnlyBookie(ServerConfiguration conf, StatsLogger statsLogger)
             throws IOException, KeeperException, InterruptedException, BookieException {
         super(conf, statsLogger);
-        stateManager = new BookieStateManager(conf, statsLogger, registrationManager, getLedgerDirsManager()) {
+        stateManager = new BookieStateManager(conf, statsLogger, metadataDriver, getLedgerDirsManager()) {
 
             @Override
             public void doTransitionToWritableMode() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index b65714763..f0c524998 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -446,8 +446,13 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
 
         // initialize metadata driver
         try {
-            this.metadataDriver = MetadataDrivers.getClientDriver(
-                URI.create(conf.getMetadataServiceUri()));
+            String metadataServiceUriStr = conf.getMetadataServiceUri();
+            if (null != metadataServiceUriStr) {
+                this.metadataDriver = MetadataDrivers.getClientDriver(URI.create(metadataServiceUriStr));
+            } else {
+                checkNotNull(zkc, "No external zookeeper provided when no metadata service uri is found");
+                this.metadataDriver = MetadataDrivers.getClientDriver("zk");
+            }
             this.metadataDriver.initialize(
                 conf,
                 scheduler,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 3392b05e1..d0078f41d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -21,12 +21,14 @@
 package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
 
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -61,7 +63,6 @@
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
-import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -1132,38 +1133,41 @@ public void processResult(int rc, String s, Object ctx) {
      */
     public static boolean format(ServerConfiguration conf,
             boolean isInteractive, boolean force) throws Exception {
-
-        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
-            boolean ledgerRootExists = rm.prepareFormat();
-
-            // If old data was there then confirm with admin.
-            boolean doFormat = true;
-            if (ledgerRootExists) {
-                if (!isInteractive) {
-                    // If non interactive and force is set, then delete old data.
-                    if (force) {
-                        doFormat = true;
+        return runFunctionWithRegistrationManager(conf, rm -> {
+            try {
+                boolean ledgerRootExists = rm.prepareFormat();
+
+                // If old data was there then confirm with admin.
+                boolean doFormat = true;
+                if (ledgerRootExists) {
+                    if (!isInteractive) {
+                        // If non interactive and force is set, then delete old data.
+                        if (force) {
+                            doFormat = true;
+                        } else {
+                            doFormat = false;
+                        }
                     } else {
-                        doFormat = false;
+                        // Confirm with the admin.
+                        doFormat = IOUtils
+                            .confirmPrompt("Ledger root already exists. "
+                                + "Are you sure to format bookkeeper metadata? "
+                                + "This may cause data loss.");
                     }
-                } else {
-                    // Confirm with the admin.
-                    doFormat = IOUtils
-                        .confirmPrompt("Ledger root already exists. "
-                            + "Are you sure to format bookkeeper metadata? "
-                            + "This may cause data loss.");
                 }
-            }
 
-            if (!doFormat) {
-                return false;
-            }
+                if (!doFormat) {
+                    return false;
+                }
 
-            BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf));
-            bkc.ledgerManagerFactory.format(conf, bkc.getMetadataClientDriver().getLayoutManager());
+                BookKeeper bkc = new BookKeeper(new ClientConfiguration(conf));
+                bkc.ledgerManagerFactory.format(conf, bkc.getMetadataClientDriver().getLayoutManager());
 
-            return rm.format();
-        }
+                return rm.format();
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
+            }
+        });
     }
 
     /**
@@ -1175,9 +1179,13 @@ public static boolean format(ServerConfiguration conf,
      * @throws Exception
      */
     public static boolean initNewCluster(ServerConfiguration conf) throws Exception {
-        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
-            return rm.initNewCluster();
-        }
+        return runFunctionWithRegistrationManager(conf, rm -> {
+            try {
+                return rm.initNewCluster();
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
+            }
+        });
     }
 
     /**
@@ -1202,17 +1210,21 @@ public static boolean nukeExistingCluster(ServerConfiguration conf, String ledge
             return false;
         }
 
-        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
-            if (!force) {
-                String readInstanceId = rm.getClusterInstanceId();
-                if ((instanceId == null) || !instanceId.equals(readInstanceId)) {
-                    LOG.error("Provided InstanceId : {} is not matching with cluster InstanceId in ZK: {}", instanceId,
-                            readInstanceId);
-                    return false;
+        return runFunctionWithRegistrationManager(conf, rm -> {
+            try {
+                if (!force) {
+                    String readInstanceId = rm.getClusterInstanceId();
+                    if ((instanceId == null) || !instanceId.equals(readInstanceId)) {
+                        LOG.error("Provided InstanceId : {} is not matching with cluster InstanceId in ZK: {}",
+                            instanceId, readInstanceId);
+                        return false;
+                    }
                 }
+                return rm.nukeExistingCluster();
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
             }
-            return rm.nukeExistingCluster();
-        }
+        });
     }
 
     /**
@@ -1244,28 +1256,32 @@ public static boolean initBookie(ServerConfiguration conf) throws Exception {
             }
         }
 
-        try (RegistrationManager rm = RegistrationManager.instantiateRegistrationManager(conf)) {
-            /*
-             * make sure that there is no bookie registered with the same
-             * bookieid and the cookie for the same bookieid is not existing.
-             */
-            String bookieId = Bookie.getBookieAddress(conf).toString();
-            if (rm.isBookieRegistered(bookieId)) {
-                LOG.error("Bookie with bookieId: {} is still registered, "
+        return runFunctionWithRegistrationManager(conf, rm -> {
+            try {
+                /*
+                 * make sure that there is no bookie registered with the same
+                 * bookieid and the cookie for the same bookieid is not existing.
+                 */
+                String bookieId = Bookie.getBookieAddress(conf).toString();
+                if (rm.isBookieRegistered(bookieId)) {
+                    LOG.error("Bookie with bookieId: {} is still registered, "
                         + "If this node is running bookie process, try stopping it first.", bookieId);
-                return false;
-            }
+                    return false;
+                }
 
-            try {
-                rm.readCookie(bookieId);
-                LOG.error("Cookie still exists in the ZK for this bookie: {}, try formatting the bookie", bookieId);
-                return false;
-            } catch (BookieException.CookieNotFoundException nfe) {
-                // it is expected for readCookie to fail with
-                // BookieException.CookieNotFoundException
+                try {
+                    rm.readCookie(bookieId);
+                    LOG.error("Cookie still exists in the ZK for this bookie: {}, try formatting the bookie", bookieId);
+                    return false;
+                } catch (BookieException.CookieNotFoundException nfe) {
+                    // it is expected for readCookie to fail with
+                    // BookieException.CookieNotFoundException
+                }
+                return true;
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
             }
-            return true;
-        }
+        });
     }
 
     private static boolean validateDirectoriesAreEmpty(File[] dirs, String typeOfDir) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index d2ba5bcf2..64f281f89 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -204,7 +204,8 @@ public String getMetadataServiceUri() throws ConfigurationException {
             String ledgerManagerType;
             Class<? extends LedgerManagerFactory> factoryClass = getLedgerManagerFactoryClass();
             if (factoryClass == null) {
-                ledgerManagerType = HierarchicalLedgerManagerFactory.NAME;
+                // set the ledger manager type to "null", so the driver implementation knows that the type is not set.
+                ledgerManagerType = "null";
             } else {
                 if (!AbstractZkLedgerManagerFactory.class.isAssignableFrom(factoryClass)) {
                     // this is a non-zk implementation
@@ -224,11 +225,14 @@ public String getMetadataServiceUri() throws ConfigurationException {
                         + factoryClass);
                 }
             }
-            serviceUri = String.format(
-                "zk+%s://%s%s",
-                ledgerManagerType,
-                getZkServers(),
-                getZkLedgersRootPath());
+            String zkServers = getZkServers();
+            if (null != zkServers) {
+                serviceUri = String.format(
+                    "zk+%s://%s%s",
+                    ledgerManagerType,
+                    getZkServers(),
+                    getZkLedgersRootPath());
+            }
         }
         return serviceUri;
     }
@@ -240,7 +244,7 @@ public String getMetadataServiceUri() throws ConfigurationException {
      * @return the configuration object.
      * @throws ConfigurationException
      */
-    public T setMetadataServiceUri(String serviceUri) throws ConfigurationException {
+    public T setMetadataServiceUri(String serviceUri) {
         setProperty(METADATA_SERVICE_URI, serviceUri);
         return getThis();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index da6368556..9df8d21fe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -2611,7 +2611,9 @@ public ServerConfiguration setExtraServerComponents(String[] componentClasses) {
      *
      * @param regManagerClass
      *            ManagerClass
+     * @deprecated since 4.7.0, in favor of using {@link #setMetadataServiceUri(String)}
      */
+    @Deprecated
     public void setRegistrationManagerClass(
             Class<? extends RegistrationManager> regManagerClass) {
         setProperty(REGISTRATION_MANAGER_CLASS, regManagerClass);
@@ -2621,7 +2623,9 @@ public void setRegistrationManagerClass(
      * Get Registration Manager Class.
      *
      * @return registration manager class.
+     * @deprecated since 4.7.0, in favor of using {@link #getMetadataServiceUri()}
      */
+    @Deprecated
     public Class<? extends RegistrationManager> getRegistrationManagerClass()
             throws ConfigurationException {
         return ReflectionUtils.getClass(this, REGISTRATION_MANAGER_CLASS,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
index 0c3922f50..3d357d4db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -21,14 +21,8 @@
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LayoutManager;
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.commons.configuration.ConfigurationException;
 
 /**
  * Registration manager interface, which a bookie server will use to do the registration process.
@@ -37,22 +31,6 @@
 @Evolving
 public interface RegistrationManager extends AutoCloseable {
 
-    /**
-     * Instantiate a RegistrationManager based on config.
-     */
-    static RegistrationManager instantiateRegistrationManager(ServerConfiguration conf) throws BookieException {
-        // Create the registration manager instance
-        Class<? extends RegistrationManager> managerCls;
-        try {
-            managerCls = conf.getRegistrationManagerClass();
-        } catch (ConfigurationException e) {
-            throw new BookieException.BookieIllegalOpException(e);
-        }
-
-        RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
-        return manager.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-    }
-
     /**
      * Registration Listener on listening the registration state.
      */
@@ -66,10 +44,6 @@ static RegistrationManager instantiateRegistrationManager(ServerConfiguration co
 
     }
 
-    RegistrationManager initialize(ServerConfiguration conf,
-                                   RegistrationListener listener,
-                                   StatsLogger statsLogger) throws BookieException;
-
     @Override
     void close();
 
@@ -136,13 +110,6 @@ RegistrationManager initialize(ServerConfiguration conf,
      */
     void removeCookie(String bookieId, Version version) throws BookieException;
 
-    /**
-     * Gets layout manager.
-     *
-     * @return the layout manager
-     */
-    LayoutManager getLayoutManager();
-
     /**
      * Prepare ledgers root node, availableNode, readonly node..
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
index 7f2c2d96c..e144dcfc7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -19,28 +19,22 @@
 package org.apache.bookkeeper.discover;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
 import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY;
 import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
 import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
-
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
@@ -56,14 +50,11 @@
 import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -96,119 +87,57 @@
         }
     };
 
-    private ServerConfiguration conf;
-    private ZooKeeper zk;
-    private List<ACL> zkAcls;
-    private volatile boolean running = false;
+    private final ServerConfiguration conf;
+    private final ZooKeeper zk;
+    private final List<ACL> zkAcls;
+    private final LayoutManager layoutManager;
 
+    private volatile boolean zkRegManagerInitialized = false;
+
+    // ledgers root path
+    private final String ledgersRootPath;
     // cookie path
-    private String cookiePath;
+    private final String cookiePath;
     // registration paths
-    protected String bookieRegistrationPath;
-    protected String bookieReadonlyRegistrationPath;
-    // layout manager
-    private LayoutManager layoutManager;
-
-    private StatsLogger statsLogger;
-
-    @Override
-    public RegistrationManager initialize(ServerConfiguration conf,
-                                          RegistrationListener listener,
-                                          StatsLogger statsLogger)
-            throws BookieException {
-        if (null == conf.getZkServers()) {
-            log.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
-            return null;
-        }
-
+    protected final String bookieRegistrationPath;
+    protected final String bookieReadonlyRegistrationPath;
+    // session timeout in milliseconds
+    private final int zkTimeoutMs;
+
+    public ZKRegistrationManager(ServerConfiguration conf,
+                                 ZooKeeper zk,
+                                 RegistrationListener listener) {
         this.conf = conf;
+        this.zk = zk;
         this.zkAcls = ZkUtils.getACLs(conf);
-        this.statsLogger = statsLogger;
 
-        this.cookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
-        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
+        this.ledgersRootPath = conf.getZkLedgersRootPath();
+        this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE;
+        this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
         this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
-
-        try {
-            this.zk = newZookeeper(conf, listener);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new MetadataStoreException(ie);
-        } catch (KeeperException | IOException e) {
-            throw new MetadataStoreException(e);
-        }
+        this.zkTimeoutMs = conf.getZkTimeout();
 
         this.layoutManager = new ZkLayoutManager(
             zk,
             conf.getZkLedgersRootPath(),
             zkAcls);
 
-        return this;
-    }
-
-    @VisibleForTesting
-    public void setZk(ZooKeeper zk) {
-        this.zk = zk;
-    }
-
-    @VisibleForTesting
-    public ZooKeeper getZk() {
-        return this.zk;
-    }
-
-    /**
-     * Create a new zookeeper client to zk cluster.
-     *
-     * <p>
-     * Bookie Server just used zk client when syncing ledgers for garbage collection.
-     * So when zk client is expired, it means this bookie server is not available in
-     * bookie server list. The bookie client will be notified for its expiration. No
-     * more bookie request will be sent to this server. So it's better to exit when zk
-     * expired.
-     * </p>
-     * <p>
-     * Since there are lots of bk operations cached in queue, so we wait for all the operations
-     * are processed and quit. It is done by calling <b>shutdown</b>.
-     * </p>
-     *
-     * @param conf server configuration
-     *
-     * @return zk client instance
-     */
-    private ZooKeeper newZookeeper(final ServerConfiguration conf, RegistrationListener listener)
-        throws InterruptedException, KeeperException, IOException {
-        Set<Watcher> watchers = new HashSet<Watcher>();
-        watchers.add(event -> {
-            if (!running) {
+        this.zk.register(event -> {
+            if (!zkRegManagerInitialized) {
                 // do nothing until first registration
                 return;
             }
             // Check for expired connection.
-            if (event.getType().equals(EventType.None) && event.getState().equals(KeeperState.Expired)) {
+            if (event.getType().equals(EventType.None)
+                && event.getState().equals(KeeperState.Expired)) {
                 listener.onRegistrationExpired();
             }
         });
-        return ZooKeeperClient.newBuilder()
-                .connectString(conf.getZkServers())
-                .sessionTimeoutMs(conf.getZkTimeout())
-                .watchers(watchers)
-                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
-                        conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
-                .requestRateLimit(conf.getZkRequestRateLimit())
-                .statsLogger(this.statsLogger.scope(BOOKIE_SCOPE))
-                .build();
     }
 
     @Override
     public void close() {
-        if (null != zk) {
-            try {
-                zk.close();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.warn("Interrupted on closing zookeeper client", e);
-            }
-        }
+        // no-op
     }
 
     /**
@@ -251,9 +180,9 @@ public void process(WatchedEvent event) {
                 // wait for it to be expired.
                 if (stat.getEphemeralOwner() != zk.getSessionId()) {
                     log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
-                            + " {} ms for znode deletion", regPath, conf.getZkTimeout());
+                            + " {} ms for znode deletion", regPath, zkTimeoutMs);
                     // waiting for the previous bookie reg znode deletion
-                    if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
+                    if (!prevNodeLatch.await(zkTimeoutMs, TimeUnit.MILLISECONDS)) {
                         throw new NodeExistsException(regPath);
                     } else {
                         return false;
@@ -291,6 +220,7 @@ private void doRegisterBookie(String regPath) throws BookieException {
             if (!checkRegNodeAndWaitExpired(regPath)) {
                 // Create the ZK ephemeral node for this Bookie.
                 zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
+                zkRegManagerInitialized = true;
             }
         } catch (KeeperException ke) {
             log.error("ZK exception registering ephemeral Znode for Bookie!", ke);
@@ -433,14 +363,14 @@ public void removeCookie(String bookieId, Version version) throws BookieExceptio
     public String getClusterInstanceId() throws BookieException {
         String instanceId = null;
         try {
-            if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {
+            if (zk.exists(ledgersRootPath, null) == null) {
                 log.error("BookKeeper metadata doesn't exist in zookeeper. "
                     + "Has the cluster been initialized? "
                     + "Try running bin/bookkeeper shell metaformat");
                 throw new KeeperException.NoNodeException("BookKeeper metadata");
             }
             try {
-                byte[] data = zk.getData(conf.getZkLedgersRootPath() + "/"
+                byte[] data = zk.getData(ledgersRootPath + "/"
                     + INSTANCEID, false, null);
                 instanceId = new String(data, UTF_8);
             } catch (KeeperException.NoNodeException e) {
@@ -452,33 +382,22 @@ public String getClusterInstanceId() throws BookieException {
         return instanceId;
     }
 
-    @VisibleForTesting
-    public void setLayoutManager(LayoutManager layoutManager) {
-        this.layoutManager = layoutManager;
-    }
-
-    @Override
-    public LayoutManager getLayoutManager(){
-        return layoutManager;
-    }
-
     @Override
     public boolean prepareFormat() throws Exception {
-        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
-        boolean availableNodeExists = null != zk.exists(conf.getZkAvailableBookiesPath(), false);
-        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
+        boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false);
         // Create ledgers root node if not exists
         if (!ledgerRootExists) {
-            zk.create(conf.getZkLedgersRootPath(), "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+            zk.create(ledgersRootPath, "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
         }
         // create available bookies node if not exists
         if (!availableNodeExists) {
-            zk.create(conf.getZkAvailableBookiesPath(), "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
+            zk.create(bookieRegistrationPath, "".getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
         }
 
         // create readonly bookies node if not exists
-        if (null == zk.exists(conf.getZkAvailableBookiesPath() + "/" + READONLY, false)) {
-            zk.create(conf.getZkAvailableBookiesPath() + "/" + READONLY, new byte[0], zkAcls, CreateMode.PERSISTENT);
+        if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
+            zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
         }
 
         return ledgerRootExists;
@@ -486,32 +405,29 @@ public boolean prepareFormat() throws Exception {
 
     @Override
     public boolean initNewCluster() throws Exception {
-        String zkLedgersRootPath = conf.getZkLedgersRootPath();
         String zkServers = conf.getZkServers();
-        String zkAvailableBookiesPath = conf.getZkAvailableBookiesPath();
-        String zkReadonlyBookiesPath = zkAvailableBookiesPath + "/" + READONLY;
-        String instanceIdPath = zkLedgersRootPath + "/" + INSTANCEID;
+        String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;
         log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers,
-                zkLedgersRootPath);
+                ledgersRootPath);
 
-        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
 
         if (ledgerRootExists) {
-            log.error("Ledger root path: {} already exists", conf.getZkLedgersRootPath());
+            log.error("Ledger root path: {} already exists", ledgersRootPath);
             return false;
         }
 
         List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
 
         // Create ledgers root node
-        multiOps.add(Op.create(zkLedgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
+        multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
 
         // create available bookies node
-        multiOps.add(Op.create(zkAvailableBookiesPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
+        multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
 
         // create readonly bookies node
         multiOps.add(Op.create(
-            zkReadonlyBookiesPath,
+            bookieReadonlyRegistrationPath,
             EMPTY_BYTE_ARRAY,
             zkAcls,
             CreateMode.PERSISTENT));
@@ -528,29 +444,27 @@ public boolean initNewCluster() throws Exception {
         AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
 
         log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers,
-                zkLedgersRootPath, instanceId);
+                ledgersRootPath, instanceId);
         return true;
     }
 
     @Override
     public boolean nukeExistingCluster() throws Exception {
-        String zkLedgersRootPath = conf.getZkLedgersRootPath();
         String zkServers = conf.getZkServers();
         log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}",
-                zkServers, zkLedgersRootPath);
+                zkServers, ledgersRootPath);
 
-        boolean ledgerRootExists = null != zk.exists(conf.getZkLedgersRootPath(), false);
+        boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
         if (!ledgerRootExists) {
             log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, "
-                    + "so exiting nuke operation", zkLedgersRootPath, conf.getZkServers());
+                    + "so exiting nuke operation", ledgersRootPath, zkServers);
             return true;
         }
 
-        String availableBookiesPath = conf.getZkAvailableBookiesPath();
-        boolean availableNodeExists = null != zk.exists(availableBookiesPath, false);
+        boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false);
         try (RegistrationClient regClient = new ZKRegistrationClient(
             zk,
-            zkLedgersRootPath,
+            ledgersRootPath,
             null
         )) {
             if (availableNodeExists) {
@@ -562,8 +476,7 @@ public boolean nukeExistingCluster() throws Exception {
                     return false;
                 }
 
-                String readOnlyBookieRegPath = availableBookiesPath + "/" + BookKeeperConstants.READONLY;
-                boolean readonlyNodeExists = null != zk.exists(readOnlyBookieRegPath, false);
+                boolean readonlyNodeExists = null != zk.exists(bookieReadonlyRegistrationPath, false);
                 if (readonlyNodeExists) {
                     Collection<BookieSocketAddress> roBookies = FutureUtils
                             .result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
@@ -585,7 +498,7 @@ public boolean nukeExistingCluster() throws Exception {
     public boolean format() throws Exception {
         // Clear underreplicated ledgers
         try {
-            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath())
+            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
                     + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
         } catch (KeeperException.NoNodeException e) {
             if (log.isDebugEnabled()) {
@@ -595,7 +508,7 @@ public boolean format() throws Exception {
 
         // Clear underreplicatedledger locks
         try {
-            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(conf.getZkLedgersRootPath()) + '/'
+            ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/'
                     + BookKeeperConstants.UNDER_REPLICATION_LOCK);
         } catch (KeeperException.NoNodeException e) {
             if (log.isDebugEnabled()) {
@@ -605,7 +518,7 @@ public boolean format() throws Exception {
 
         // Clear the cookies
         try {
-            ZKUtil.deleteRecursive(zk, conf.getZkLedgersRootPath() + "/cookies");
+            ZKUtil.deleteRecursive(zk, cookiePath);
         } catch (KeeperException.NoNodeException e) {
             if (log.isDebugEnabled()) {
                 log.debug("cookies node not exists in zookeeper to delete");
@@ -614,7 +527,7 @@ public boolean format() throws Exception {
 
         // Clear the INSTANCEID
         try {
-            zk.delete(conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID, -1);
+            zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1);
         } catch (KeeperException.NoNodeException e) {
             if (log.isDebugEnabled()) {
                 log.debug("INSTANCEID not exists in zookeeper to delete");
@@ -623,8 +536,8 @@ public boolean format() throws Exception {
 
         // create INSTANCEID
         String instanceId = UUID.randomUUID().toString();
-        zk.create(conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.INSTANCEID,
-                instanceId.getBytes(Charsets.UTF_8), ZkUtils.getACLs(conf), CreateMode.PERSISTENT);
+        zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
+                instanceId.getBytes(Charsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
 
         log.info("Successfully formatted BookKeeper metadata");
         return true;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
index 5600dfac3..8b8c2bb9e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -18,11 +18,13 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LayoutManager.LedgerLayoutExistsException;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
@@ -38,6 +40,7 @@
 
     protected ZooKeeper zk;
 
+    @SuppressWarnings("deprecation")
     @Override
     public void format(AbstractConfiguration<?> conf, LayoutManager layoutManager)
             throws InterruptedException, KeeperException, IOException {
@@ -124,16 +127,32 @@ public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, Lay
     public static LedgerManagerFactory newLedgerManagerFactory(
         final AbstractConfiguration<?> conf, LayoutManager layoutManager)
             throws IOException, InterruptedException {
-        Class<? extends LedgerManagerFactory> factoryClass;
+        String metadataServiceUriStr;
         try {
-            factoryClass = conf.getLedgerManagerFactoryClass();
-        } catch (Exception e) {
-            factoryClass = attemptToResolveShadedLedgerManagerFactory(
-                conf,
-                conf.getLedgerManagerFactoryClassName(),
-                e);
+            metadataServiceUriStr = conf.getMetadataServiceUri();
+        } catch (ConfigurationException e) {
+            log.error("Failed to retrieve metadata service uri from configuration", e);
+            throw new IOException(
+                "Failed to retrieve metadata service uri from configuration", e);
+        }
+
+        Class<? extends LedgerManagerFactory> factoryClass;
+        String ledgerRootPath;
+        // `metadataServiceUri` can be null when constructing bookkeeper client using an external zookeeper client.
+        if (null == metadataServiceUriStr) { //
+            try {
+                factoryClass = conf.getLedgerManagerFactoryClass();
+            } catch (ConfigurationException e) {
+                log.error("Failed to get ledger manager factory class when using an external zookeeper client", e);
+                throw new IOException(
+                    "Failed to get ledger manager factory class when using an external zookeeper client", e);
+            }
+            ledgerRootPath = conf.getZkLedgersRootPath();
+        } else {
+            URI metadataServiceUri = URI.create(metadataServiceUriStr);
+            factoryClass = ZKMetadataDriverBase.resolveLedgerManagerFactory(metadataServiceUri);
+            ledgerRootPath = metadataServiceUri.getPath();
         }
-        String ledgerRootPath = conf.getZkLedgersRootPath();
 
         if (null == ledgerRootPath || ledgerRootPath.length() == 0) {
             throw new IOException("Empty Ledger Root Path.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 22abcb6a7..88204e6db 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -27,7 +27,7 @@
 /**
  * Interface for marking ledgers which need to be rereplicated.
  */
-public interface LedgerUnderreplicationManager {
+public interface LedgerUnderreplicationManager extends AutoCloseable {
     /**
      * Mark a ledger as underreplicated. The replication should
      * then check which fragments are underreplicated and rereplicate them
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java
index c88a9c46f..9b11f28d2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java
@@ -54,8 +54,7 @@ MetadataBookieDriver initialize(ServerConfiguration conf,
      *
      * @return the registration manager used for registering/unregistering bookies.
      */
-    RegistrationManager getRegistrationManager()
-        throws MetadataException;
+    RegistrationManager getRegistrationManager();
 
     /**
      * Return the ledger manager factory used for accessing ledger metadata.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
index d124700de..f27820597 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataDrivers.java
@@ -23,16 +23,25 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.net.URI;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.exceptions.Code;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -309,4 +318,70 @@ public static MetadataBookieDriver getBookieDriver(URI uri) {
         return getBookieDriver(schemeParts[0]);
     }
 
+    /**
+     * Process the provided <i>function</i> with metadata bookie driver resolved
+     * from the metadata service uri returned by {@link ServerConfiguration#getMetadataServiceUri()}.
+     *
+     * @param conf server configuration
+     * @param function function to apply with metadata bookie driver.
+     * @throws MetadataException when failed to access metadata store
+     * @throws ExecutionException exception thrown when processing <tt>function</tt>.
+     */
+    public static <T> T runFunctionWithMetadataBookieDriver(ServerConfiguration conf,
+                                                            Function<MetadataBookieDriver, T> function)
+            throws MetadataException, ExecutionException {
+        try (MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+            URI.create(conf.getMetadataServiceUri())
+        )) {
+            driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+            try {
+                return function.apply(driver);
+            } catch (Exception uee) {
+                if (uee.getCause() instanceof MetadataException) {
+                    throw (MetadataException) uee.getCause();
+                } else {
+                    throw new ExecutionException(uee.getMessage(), uee.getCause());
+                }
+            }
+        } catch (ConfigurationException e) {
+            throw new MetadataException(Code.INVALID_METADATA_SERVICE_URI, e);
+        }
+    }
+
+    /**
+     * Process the provided <i>function</i> with registration manager resolved
+     * from the metadata service uri returned by {@link ServerConfiguration#getMetadataServiceUri()}.
+     *
+     * @param conf server configuration
+     * @param function function to apply with registration manager.
+     * @throws MetadataException when failed to access metadata store
+     * @throws ExecutionException exception thrown when processing <tt>consumer</tt>.
+     */
+    public static <T> T runFunctionWithRegistrationManager(ServerConfiguration conf,
+                                                           Function<RegistrationManager, T> function)
+            throws MetadataException, ExecutionException {
+        return runFunctionWithMetadataBookieDriver(conf, driver -> function.apply(driver.getRegistrationManager()));
+    }
+
+    /**
+     * Process the provided <i>function</i> with ledger manager factory resolved
+     * from the metadata service uri returned by {@link ServerConfiguration#getMetadataServiceUri()}.
+     *
+     * @param conf server configuration
+     * @param function function to apply with ledger manager factory.
+     * @throws MetadataException when failed to access metadata store
+     * @throws ExecutionException exception thrown when processing <tt>consumer</tt>.
+     */
+    public static <T> T runFunctionWithLedgerManagerFactory(ServerConfiguration conf,
+                                                            Function<LedgerManagerFactory, T> function)
+            throws MetadataException, ExecutionException {
+        return runFunctionWithMetadataBookieDriver(conf, driver -> {
+            try {
+                return function.apply(driver.getLedgerManagerFactory());
+            } catch (MetadataException me) {
+                throw new UncheckedExecutionException(me.getMessage(), me);
+            }
+        });
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java
index 219ae2d3a..e7b1ad0d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriver.java
@@ -18,18 +18,20 @@
  */
 package org.apache.bookkeeper.meta.zk;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.discover.RegistrationManager.RegistrationListener;
 import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
-import org.apache.bookkeeper.meta.exceptions.Code;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 
 /**
  * ZooKeeper based metadata bookie driver.
@@ -56,39 +58,44 @@ public synchronized MetadataBookieDriver initialize(ServerConfiguration conf,
                                                         StatsLogger statsLogger)
             throws MetadataException {
         super.initialize(
-            conf, statsLogger, Optional.empty());
-        this.statsLogger = statsLogger;
+            conf,
+            statsLogger.scope(BOOKIE_SCOPE),
+            new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
+                        conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE),
+            Optional.empty());
         this.serverConf = conf;
         this.listener = listener;
+        this.statsLogger = statsLogger;
         return this;
     }
 
+    @VisibleForTesting
+    public synchronized void setRegManager(RegistrationManager regManager) {
+        this.regManager = regManager;
+    }
+
     @Override
-    public synchronized RegistrationManager getRegistrationManager()
-            throws MetadataException {
+    public synchronized RegistrationManager getRegistrationManager() {
         if (null == regManager) {
-            regManager = new ZKRegistrationManager();
-            try {
-                regManager.initialize(
-                    serverConf,
-                    listener,
-                    statsLogger);
-            } catch (BookieException e) {
-                throw new MetadataException(
-                    Code.METADATA_SERVICE_ERROR,
-                    "Failed to initialize registration manager",
-                    e);
-            }
+            regManager = new ZKRegistrationManager(
+                serverConf,
+                zk,
+                listener
+            );
         }
         return regManager;
     }
 
     @Override
     public void close() {
-        if (null != regManager) {
-            regManager.close();
+        RegistrationManager rmToClose;
+        synchronized (this) {
+            rmToClose = regManager;
             regManager = null;
         }
+        if (null != rmToClose) {
+            rmToClose.close();
+        }
         super.close();
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
index 9141c6ced..a5dcaa740 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataClientDriver.java
@@ -28,6 +28,7 @@
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 
 /**
  * ZooKeeper based metadata client driver.
@@ -51,11 +52,16 @@
     public synchronized MetadataClientDriver initialize(ClientConfiguration conf,
                                                         ScheduledExecutorService scheduler,
                                                         StatsLogger statsLogger,
+
                                                         Optional<Object> optionalCtx)
             throws MetadataException {
         super.initialize(
             conf,
             statsLogger,
+            new BoundExponentialBackoffRetryPolicy(
+                conf.getZkTimeout(),
+                conf.getZkTimeout(),
+                0),
             optionalCtx);
         this.statsLogger = statsLogger;
         this.clientConf = conf;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
index 98a018d28..e74828c3d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBase.java
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.Optional;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -42,7 +43,7 @@
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
@@ -64,7 +65,7 @@ protected static String getZKServersFromServiceUri(URI uri) {
     }
 
     @SuppressWarnings("deprecation")
-    protected static Class<? extends LedgerManagerFactory> resolveLedgerManagerFactory(URI metadataServiceUri) {
+    public static Class<? extends LedgerManagerFactory> resolveLedgerManagerFactory(URI metadataServiceUri) {
         checkNotNull(metadataServiceUri, "Metadata service uri is null");
         String scheme = metadataServiceUri.getScheme();
         checkNotNull(scheme, "Invalid metadata service : " + metadataServiceUri);
@@ -86,6 +87,11 @@ protected static String getZKServersFromServiceUri(URI uri) {
                 case org.apache.bookkeeper.meta.MSLedgerManagerFactory.NAME:
                     ledgerManagerFactoryClass = org.apache.bookkeeper.meta.MSLedgerManagerFactory.class;
                     break;
+                case "null":
+                    // the ledger manager factory class is not set, so the client will be using the class that is
+                    // recorded in ledger manager layout.
+                    ledgerManagerFactoryClass = null;
+                    break;
                 default:
                     throw new IllegalArgumentException("Unknown ledger manager type found '"
                         + schemeParts[1] + "' at uri : " + metadataServiceUri);
@@ -97,13 +103,13 @@ protected static String getZKServersFromServiceUri(URI uri) {
     }
 
     // URI
-    protected URI metadataServiceUri;
     protected AbstractConfiguration<?> conf;
     protected StatsLogger statsLogger;
 
     // zookeeper related variables
     protected List<ACL> acls;
     @Getter
+    @Setter
     protected ZooKeeper zk = null;
     // whether the zk handle is one we created, or is owned by whoever
     // instantiated us
@@ -115,7 +121,6 @@ protected static String getZKServersFromServiceUri(URI uri) {
     // managers
     protected LayoutManager layoutManager;
     protected LedgerManagerFactory lmFactory;
-    protected Class<? extends LedgerManagerFactory> ledgerManagerFactoryClass = null;
 
     public String getScheme() {
         return SCHEME;
@@ -124,43 +129,48 @@ public String getScheme() {
     @SneakyThrows(InterruptedException.class)
     protected void initialize(AbstractConfiguration<?> conf,
                               StatsLogger statsLogger,
+                              RetryPolicy zkRetryPolicy,
                               Optional<Object> optionalCtx) throws MetadataException {
         this.conf = conf;
-
-        final String metadataServiceUriStr;
-        try {
-            metadataServiceUriStr = conf.getMetadataServiceUri();
-        } catch (ConfigurationException e) {
-            log.error("Failed to retrieve metadata service uri from configuration", e);
-            throw new MetadataException(
-                Code.INVALID_METADATA_SERVICE_URI, e);
-        }
-
-        this.metadataServiceUri = URI.create(metadataServiceUriStr);
-        ledgerManagerFactoryClass = resolveLedgerManagerFactory(metadataServiceUri);
-
-        // get the initialize root path
-        this.ledgersRootPath = metadataServiceUri.getPath();
-        final String bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
-        final String bookieReadonlyRegistrationPath = bookieRegistrationPath + "/" + READONLY;
-
-        // construct the zookeeper
-        final String zkServers = getZKServersFromServiceUri(metadataServiceUri);
-        log.info("Initialize zookeeper metadata driver at metadata service uri {} :"
-            + " zkServers = {}, ledgersRootPath = {}.", metadataServiceUriStr, zkServers, ledgersRootPath);
         this.acls = ZkUtils.getACLs(conf);
+
         if (optionalCtx.isPresent()
             && optionalCtx.get() instanceof ZooKeeper) {
+            this.ledgersRootPath = conf.getZkLedgersRootPath();
+
+            log.info("Initialize zookeeper metadata driver with external zookeeper client : ledgersRootPath = {}.",
+                ledgersRootPath);
+
             // if an external zookeeper is added, use the zookeeper instance
             this.zk = (ZooKeeper) (optionalCtx.get());
             this.ownZKHandle = false;
         } else {
+            final String metadataServiceUriStr;
+            try {
+                metadataServiceUriStr = conf.getMetadataServiceUri();
+            } catch (ConfigurationException e) {
+                log.error("Failed to retrieve metadata service uri from configuration", e);
+                throw new MetadataException(
+                    Code.INVALID_METADATA_SERVICE_URI, e);
+            }
+
+            URI metadataServiceUri = URI.create(metadataServiceUriStr);
+            // get the initialize root path
+            this.ledgersRootPath = metadataServiceUri.getPath();
+            final String bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
+            final String bookieReadonlyRegistrationPath = bookieRegistrationPath + "/" + READONLY;
+
+            // construct the zookeeper
+            final String zkServers = getZKServersFromServiceUri(metadataServiceUri);
+            log.info("Initialize zookeeper metadata driver at metadata service uri {} :"
+                + " zkServers = {}, ledgersRootPath = {}.", metadataServiceUriStr, zkServers, ledgersRootPath);
+
             try {
                 this.zk = ZooKeeperClient.newBuilder()
                     .connectString(zkServers)
                     .sessionTimeoutMs(conf.getZkTimeout())
-                    .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkTimeout(),
-                        conf.getZkTimeout(), 0))
+                    .operationRetryPolicy(zkRetryPolicy)
+                    .requestRateLimit(conf.getZkRequestRateLimit())
                     .statsLogger(statsLogger)
                     .build();
 
@@ -172,6 +182,8 @@ protected void initialize(AbstractConfiguration<?> conf,
                             CreateMode.PERSISTENT);
                     } catch (KeeperException.NodeExistsException e) {
                         // this node is just now created by someone.
+                    } catch (KeeperException.NoNodeException e) {
+                        // the cluster hasn't been initialized
                     }
                 }
             } catch (IOException | KeeperException e) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
index fb15a5f4d..d85e5477e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
@@ -33,7 +33,6 @@
 import org.apache.bookkeeper.http.service.ErrorHttpService;
 import org.apache.bookkeeper.http.service.HeartbeatService;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
-import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -68,7 +67,6 @@
     private final BookieServer bookieServer;
     private final AutoRecoveryMain autoRecovery;
     private final ServerConfiguration serverConf;
-    private final LayoutManager layoutManager;
     private final ZooKeeper zk;
     private final BookKeeperAdmin bka;
     private final ExecutorService executor;
@@ -81,7 +79,6 @@ private BKHttpServiceProvider(BookieServer bookieServer,
         this.bookieServer = bookieServer;
         this.autoRecovery = autoRecovery;
         this.serverConf = serverConf;
-        this.layoutManager = bookieServer.getBookie().getRegistrationManager().getLayoutManager();
         this.zk = ZooKeeperClient.newBuilder()
           .connectString(serverConf.getZkServers())
           .sessionTimeoutMs(serverConf.getZkTimeout())
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ExpandStorageService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ExpandStorageService.java
index 4e7dc16b5..2d59661b2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ExpandStorageService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ExpandStorageService.java
@@ -22,17 +22,18 @@
 
 import com.google.common.collect.Lists;
 import java.io.File;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,10 +79,11 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
                 allLedgerDirs.addAll(Arrays.asList(indexDirectories));
             }
 
-            try {
-                RegistrationManager rm = new ZKRegistrationManager();
-                rm.initialize(conf, () -> { }, NullStatsLogger.INSTANCE);
-                Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
+            try (MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
+                URI.create(conf.getMetadataServiceUri())
+            )) {
+                driver.initialize(conf, () -> { }, NullStatsLogger.INSTANCE);
+                Bookie.checkEnvironmentWithStorageExpansion(conf, driver,
                   Lists.newArrayList(journalDirectories), allLedgerDirs);
             } catch (BookieException e) {
                 LOG.error("Exception occurred while updating cookie for storage expansion", e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/RecoveryBookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/RecoveryBookieService.java
index 3b1e1750a..afc5bcd5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/RecoveryBookieService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/RecoveryBookieService.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.server.http.service;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithRegistrationManager;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
@@ -28,15 +29,12 @@
 import org.apache.bookkeeper.bookie.Cookie;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.JsonUtil;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,27 +104,25 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
         }
 
         if (HttpServer.Method.PUT == request.getMethod() && !requestJsonBody.bookieSrc.isEmpty()) {
-
-            Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
-            RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
-            rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-
-            String bookieSrcString[] = requestJsonBody.bookieSrc.get(0).split(":");
-            BookieSocketAddress bookieSrc = new BookieSocketAddress(
-              bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
-            boolean deleteCookie = requestJsonBody.deleteCookie;
-            executor.execute(() -> {
-                try {
-                    LOG.info("Start recovering bookie.");
-                    bka.recoverBookieData(bookieSrc);
-                    if (deleteCookie) {
-                        Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
-                        cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
+            runFunctionWithRegistrationManager(conf, rm -> {
+                String bookieSrcString[] = requestJsonBody.bookieSrc.get(0).split(":");
+                BookieSocketAddress bookieSrc = new BookieSocketAddress(
+                    bookieSrcString[0], Integer.parseInt(bookieSrcString[1]));
+                boolean deleteCookie = requestJsonBody.deleteCookie;
+                executor.execute(() -> {
+                    try {
+                        LOG.info("Start recovering bookie.");
+                        bka.recoverBookieData(bookieSrc);
+                        if (deleteCookie) {
+                            Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
+                            cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
+                        }
+                        LOG.info("Complete recovering bookie");
+                    } catch (Exception e) {
+                        LOG.error("Exception occurred while recovering bookie", e);
                     }
-                    LOG.info("Complete recovering bookie");
-                } catch (Exception e) {
-                    LOG.error("Exception occurred while recovering bookie", e);
-                }
+                });
+                return null;
             });
 
             response.setCode(HttpServer.StatusCode.OK);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
index b8b235150..6eee85a2e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
@@ -114,6 +114,7 @@ public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort, Fil
         return server;
     }
 
+    @SuppressWarnings("deprecation")
     private void initializeZookeeper(AbstractConfiguration conf, String zkHost, int zkPort) throws IOException {
         LOG.info("Instantiate ZK Client");
         //initialize the zk client with values
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 36406ce74..96790ac06 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -27,6 +27,11 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -39,7 +44,9 @@
 import java.util.List;
 import java.util.Random;
 
+import java.util.concurrent.ExecutionException;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -49,7 +56,8 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -61,12 +69,12 @@
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +87,7 @@
 
     @Rule
     public final TestName runtime = new TestName();
-    RegistrationManager rm;
+    ZKMetadataBookieDriver driver;
 
     public BookieInitializationTest() {
         super(0);
@@ -92,29 +100,17 @@ public BookieInitializationTest() {
     public void setUp() throws Exception {
         super.setUp();
         zkUtil.createBKEnsemble("/" + runtime.getMethodName());
-        rm = new ZKRegistrationManager();
+        driver = new ZKMetadataBookieDriver();
     }
 
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (rm != null) {
-            rm.close();
+        if (driver != null) {
+            driver.close();
         }
     }
 
-    private static class MockBookie extends Bookie {
-        MockBookie(ServerConfiguration conf) throws IOException,
-                KeeperException, InterruptedException, BookieException {
-            super(conf);
-        }
-
-        void testRegisterBookie(ServerConfiguration conf) throws IOException {
-            super.getStateManager().doRegisterBookie();
-        }
-
-    }
-
     /**
      * Verify the bookie server exit code. On ZooKeeper exception, should return
      * exit code ZK_REG_FAIL = 4
@@ -126,18 +122,21 @@ public void testExitCodeZK_REG_FAIL() throws Exception {
         final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(tmpDir.getPath())
             .setLedgerDirNames(new String[] { tmpDir.getPath() })
-            .setZkServers(null);
+            .setZkServers(zkUtil.getZooKeeperConnectString());
+
+        RegistrationManager rm = mock(RegistrationManager.class);
+        doThrow(new MetadataStoreException("mocked exception"))
+            .when(rm)
+            .registerBookie(anyString(), anyBoolean());
 
         // simulating ZooKeeper exception by assigning a closed zk client to bk
         BookieServer bkServer = new BookieServer(conf) {
             protected Bookie newBookie(ServerConfiguration conf)
                     throws IOException, KeeperException, InterruptedException,
                     BookieException {
-                MockBookie bookie = new MockBookie(conf);
-                rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-                bookie.setRegistrationManager(rm);
-                ((ZKRegistrationManager) bookie.registrationManager).setZk(zkc);
-                ((ZKRegistrationManager) bookie.registrationManager).getZk().close();
+                Bookie bookie = new Bookie(conf);
+                MetadataBookieDriver driver = Whitebox.getInternalState(bookie, "metadataDriver");
+                ((ZKMetadataBookieDriver) driver).setRegManager(rm);
                 return bookie;
             }
         };
@@ -150,30 +149,25 @@ protected Bookie newBookie(ServerConfiguration conf)
 
     @Test
     public void testBookieRegistrationWithSameZooKeeperClient() throws Exception {
-        File tmpDir = createTempDir("bookie", "test");
-
         final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
-        conf.setJournalDirName(tmpDir.getPath())
-            .setLedgerDirNames(new String[] { tmpDir.getPath() })
-            .setZkServers(null).setListeningInterface(null);
+        conf.setZkServers(zkUtil.getZooKeeperConnectString())
+            .setListeningInterface(null);
 
-        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
-                + InetAddress.getLocalHost().getHostAddress() + ":"
-                + conf.getBookiePort();
+        String bookieId = Bookie.getBookieAddress(conf).toString();
 
-        MockBookie b = new MockBookie(conf);
-        conf.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.setRegistrationManager(rm);
-        b.testRegisterBookie(conf);
-        ZooKeeper zooKeeper = ((ZKRegistrationManager) rm).getZk();
-        assertNotNull("Bookie registration node doesn't exists!",
-            zooKeeper.exists(bkRegPath, false));
-
-        // test register bookie again if the registeration node is created by itself.
-        b.testRegisterBookie(conf);
-        assertNotNull("Bookie registration node doesn't exists!",
-            zooKeeper.exists(bkRegPath, false));
+        driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        try (StateManager manager = new BookieStateManager(conf, driver)) {
+            manager.registerBookie(true).get();
+            assertTrue(
+                "Bookie registration node doesn't exists!",
+                driver.getRegistrationManager().isBookieRegistered(bookieId));
+
+            // test register bookie again if the registeration node is created by itself.
+            manager.registerBookie(true).get();
+            assertTrue(
+                "Bookie registration node doesn't exists!",
+                driver.getRegistrationManager().isBookieRegistered(bookieId));
+        }
     }
 
     /**
@@ -183,107 +177,94 @@ public void testBookieRegistrationWithSameZooKeeperClient() throws Exception {
      */
     @Test
     public void testBookieRegistration() throws Exception {
-        File tmpDir = createTempDir("bookie", "test");
-
         final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
-        conf.setJournalDirName(tmpDir.getPath())
-            .setLedgerDirNames(new String[] { tmpDir.getPath() })
-            .setZkServers(null).setListeningInterface(null);
-
-        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
-                + InetAddress.getLocalHost().getHostAddress() + ":"
-                + conf.getBookiePort();
-        MockBookie b = new MockBookie(conf);
+        conf.setZkServers(zkUtil.getZooKeeperConnectString())
+            .setListeningInterface(null);
 
-        conf.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.setRegistrationManager(rm);
-        b.testRegisterBookie(conf);
+        String bookieId = Bookie.getBookieAddress(conf).toString();
+        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/" + bookieId;
 
-        Stat bkRegNode1 = ((ZKRegistrationManager) rm).getZk().exists(bkRegPath, false);
-        assertNotNull("Bookie registration node doesn't exists!",
-                bkRegNode1);
+        driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        try (StateManager manager = new BookieStateManager(conf, driver)) {
+            manager.registerBookie(true).get();
+        }
+        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
+        assertNotNull("Bookie registration has been failed", bkRegNode1);
 
         // simulating bookie restart, on restart bookie will create new
         // zkclient and doing the registration.
-        RegistrationManager newRm = new ZKRegistrationManager();
-        newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.setRegistrationManager(newRm);
-
-        try (ZooKeeperClient newZk = createNewZKClient()) {
-            // deleting the znode, so that the bookie registration should
-            // continue successfully on NodeDeleted event
-            new Thread(() -> {
-                try {
-                    Thread.sleep(conf.getZkTimeout() / 3);
-                    zkc.delete(bkRegPath, -1);
-                } catch (Exception e) {
-                    // Not handling, since the testRegisterBookie will fail
-                    LOG.error("Failed to delete the znode :" + bkRegPath, e);
-                }
-            }).start();
-            try {
-                b.testRegisterBookie(conf);
-            } catch (IOException e) {
-                Throwable t = e.getCause();
-                if (t instanceof KeeperException) {
-                    KeeperException ke = (KeeperException) t;
-                    assertTrue("ErrorCode:" + ke.code()
-                            + ", Registration node exists",
-                        ke.code() != KeeperException.Code.NODEEXISTS);
+        try (MetadataBookieDriver newDriver = new ZKMetadataBookieDriver()) {
+            newDriver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+
+            try (ZooKeeperClient newZk = createNewZKClient()) {
+                // deleting the znode, so that the bookie registration should
+                // continue successfully on NodeDeleted event
+                new Thread(() -> {
+                    try {
+                        Thread.sleep(conf.getZkTimeout() / 3);
+                        zkc.delete(bkRegPath, -1);
+                    } catch (Exception e) {
+                        // Not handling, since the testRegisterBookie will fail
+                        LOG.error("Failed to delete the znode :" + bkRegPath, e);
+                    }
+                }).start();
+                try (StateManager newMgr = new BookieStateManager(conf, newDriver)) {
+                    newMgr.registerBookie(true).get();
+                } catch (IOException e) {
+                    Throwable t = e.getCause();
+                    if (t instanceof KeeperException) {
+                        KeeperException ke = (KeeperException) t;
+                        assertTrue("ErrorCode:" + ke.code()
+                                + ", Registration node exists",
+                            ke.code() != KeeperException.Code.NODEEXISTS);
+                    }
+                    throw e;
                 }
-                throw e;
-            }
 
-            // verify ephemeral owner of the bkReg znode
-            Stat bkRegNode2 = newZk.exists(bkRegPath, false);
-            assertNotNull("Bookie registration has been failed", bkRegNode2);
-            assertTrue("Bookie is referring to old registration znode:"
-                + bkRegNode1 + ", New ZNode:" + bkRegNode2, bkRegNode1
-                .getEphemeralOwner() != bkRegNode2.getEphemeralOwner());
+                // verify ephemeral owner of the bkReg znode
+                Stat bkRegNode2 = newZk.exists(bkRegPath, false);
+                assertNotNull("Bookie registration has been failed", bkRegNode2);
+                assertTrue("Bookie is referring to old registration znode:"
+                    + bkRegNode1 + ", New ZNode:" + bkRegNode2, bkRegNode1
+                    .getEphemeralOwner() != bkRegNode2.getEphemeralOwner());
+            }
         }
     }
 
     @Test(timeout = 20000)
     public void testBookieRegistrationWithFQDNHostNameAsBookieID() throws Exception {
-        File tmpDir = createTempDir("bookie", "test");
-
-        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration().setZkServers(null)
-                .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(new String[] { tmpDir.getPath() })
-                .setUseHostNameAsBookieID(true).setListeningInterface(null);
-
-        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
-                + InetAddress.getLocalHost().getCanonicalHostName() + ":" + conf.getBookiePort();
-        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseHostNameAsBookieID(true)
+            .setListeningInterface(null);
 
-        MockBookie bWithFQDNHostname = new MockBookie(conf);
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        bWithFQDNHostname.registrationManager = rm;
+        final String bookieId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + conf.getBookiePort();
 
-        bWithFQDNHostname.testRegisterBookie(conf);
-        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
-        Assert.assertNotNull("Bookie registration node doesn't exists!", bkRegNode1);
+        driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        try (StateManager manager = new BookieStateManager(conf, driver)) {
+            manager.registerBookie(true).get();
+            assertTrue("Bookie registration node doesn't exists!",
+                driver.getRegistrationManager().isBookieRegistered(bookieId));
+        }
     }
 
     @Test(timeout = 20000)
     public void testBookieRegistrationWithShortHostNameAsBookieID() throws Exception {
-        File tmpDir = createTempDir("bookie", "test");
-
-        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration().setZkServers(null)
-                .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(new String[] { tmpDir.getPath() })
-                .setUseHostNameAsBookieID(true).setUseShortHostName(true).setListeningInterface(null);
-
-        final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
-                + (InetAddress.getLocalHost().getCanonicalHostName().split("\\.", 2)[0]) + ":" + conf.getBookiePort();
-        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+            .setZkServers(zkUtil.getZooKeeperConnectString())
+            .setUseHostNameAsBookieID(true)
+            .setUseShortHostName(true)
+            .setListeningInterface(null);
 
-        MockBookie bWithShortHostname = new MockBookie(conf);
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        bWithShortHostname.registrationManager = rm;
+        final String bookieId = InetAddress.getLocalHost().getCanonicalHostName().split("\\.", 2)[0]
+            + ":" + conf.getBookiePort();
 
-        bWithShortHostname.testRegisterBookie(conf);
-        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
-        Assert.assertNotNull("Bookie registration node doesn't exists!", bkRegNode1);
+        driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        try (StateManager manager = new BookieStateManager(conf, driver)) {
+            manager.registerBookie(true).get();
+            assertTrue("Bookie registration node doesn't exists!",
+                driver.getRegistrationManager().isBookieRegistered(bookieId));
+        }
     }
 
     /**
@@ -293,62 +274,56 @@ public void testBookieRegistrationWithShortHostNameAsBookieID() throws Exception
      */
     @Test
     public void testRegNodeExistsAfterSessionTimeOut() throws Exception {
-        File tmpDir = createTempDir("bookie", "test");
-
-        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
-        conf.setJournalDirName(tmpDir.getPath())
-            .setLedgerDirNames(new String[] { tmpDir.getPath() })
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
             .setZkServers(zkUtil.getZooKeeperConnectString())
             .setListeningInterface(null);
 
-        String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
-                + InetAddress.getLocalHost().getHostAddress() + ":"
+        String bookieId = InetAddress.getLocalHost().getHostAddress() + ":"
                 + conf.getBookiePort();
+        String bkRegPath = conf.getZkAvailableBookiesPath() + "/" + bookieId;
 
-        MockBookie b = new MockBookie(conf);
-
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.setRegistrationManager(rm);
-        b.testRegisterBookie(conf);
+        driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        try (StateManager manager = new BookieStateManager(conf, driver)) {
+            manager.registerBookie(true).get();
+            assertTrue("Bookie registration node doesn't exists!",
+                driver.getRegistrationManager().isBookieRegistered(bookieId));
+        }
         Stat bkRegNode1 = zkc.exists(bkRegPath, false);
-        assertNotNull("Bookie registration node doesn't exists!",
-                bkRegNode1);
+        assertNotNull("Bookie registration has been failed",
+            bkRegNode1);
 
         // simulating bookie restart, on restart bookie will create new
         // zkclient and doing the registration.
-        ZooKeeperClient newzk = createNewZKClient();
-        RegistrationManager newRm = new ZKRegistrationManager();
-        newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        b.setRegistrationManager(newRm);
-        try {
-            b.testRegisterBookie(conf);
-            fail("Should throw NodeExistsException as the znode is not getting expired");
-        } catch (IOException e) {
-            Throwable t1 = e.getCause(); // BookieException.MetadataStoreException
-            Throwable t2 = t1.getCause(); // IOException
-            Throwable t3 = t2.getCause(); // KeeperException.NodeExistsException
-
-            if (t3 instanceof KeeperException) {
-                KeeperException ke = (KeeperException) t3;
-                assertTrue("ErrorCode:" + ke.code()
-                        + ", Registration node doesn't exists",
+        try (MetadataBookieDriver newDriver = new ZKMetadataBookieDriver()) {
+            newDriver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+            try (StateManager newMgr = new BookieStateManager(conf, newDriver)) {
+                newMgr.registerBookie(true).get();
+                fail("Should throw NodeExistsException as the znode is not getting expired");
+            } catch (ExecutionException ee) {
+                Throwable e = ee.getCause(); // IOException
+                Throwable t1 = e.getCause(); // BookieException.MetadataStoreException
+                Throwable t2 = t1.getCause(); // IOException
+                Throwable t3 = t2.getCause(); // KeeperException.NodeExistsException
+
+                if (t3 instanceof KeeperException) {
+                    KeeperException ke = (KeeperException) t3;
+                    assertTrue("ErrorCode:" + ke.code()
+                            + ", Registration node doesn't exists",
                         ke.code() == KeeperException.Code.NODEEXISTS);
 
-                // verify ephemeral owner of the bkReg znode
-                Stat bkRegNode2 = newzk.exists(bkRegPath, false);
-                assertNotNull("Bookie registration has been failed",
+                    // verify ephemeral owner of the bkReg znode
+                    Stat bkRegNode2 = zkc.exists(bkRegPath, false);
+                    assertNotNull("Bookie registration has been failed",
                         bkRegNode2);
-                assertTrue(
+                    assertTrue(
                         "Bookie wrongly registered. Old registration znode:"
-                                + bkRegNode1 + ", New znode:" + bkRegNode2,
+                            + bkRegNode1 + ", New znode:" + bkRegNode2,
                         bkRegNode1.getEphemeralOwner() == bkRegNode2
-                                .getEphemeralOwner());
-                return;
+                            .getEphemeralOwner());
+                    return;
+                }
+                throw ee;
             }
-            throw e;
-        } finally {
-            newzk.close();
-            newRm.close();
         }
     }
 
@@ -464,19 +439,13 @@ public void testDuplicateBookieServerStartup() throws Exception {
         conf.setBookiePort(port)
             .setJournalDirName(tmpDir.getPath())
             .setLedgerDirNames(new String[] { tmpDir.getPath() })
-            .setZkServers(null);
+            .setZkServers(zkUtil.getZooKeeperConnectString());
         BookieServer bs1 = new BookieServer(conf);
-        conf.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-        bs1.getBookie().setRegistrationManager(rm);
         bs1.start();
         BookieServer bs2 = null;
         // starting bk server with same conf
         try {
             bs2 = new BookieServer(conf);
-            RegistrationManager newRm = new ZKRegistrationManager();
-            newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-            bs2.getBookie().registrationManager = newRm;
             bs2.start();
             fail("Should throw BindException, as the bk server is already running!");
         } catch (BindException e) {
@@ -505,12 +474,9 @@ public void testBookieServerStartupOnEphemeralPorts() throws Exception {
             .setJournalDirName(tmpDir1.getPath())
             .setLedgerDirNames(
                 new String[] { tmpDir1.getPath() })
-            .setZkServers(null);
+            .setZkServers(zkUtil.getZooKeeperConnectString());
         assertEquals(0, conf1.getBookiePort());
         BookieServer bs1 = new BookieServer(conf1);
-        conf1.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(conf1, () -> {}, NullStatsLogger.INSTANCE);
-        bs1.getBookie().registrationManager = rm;
         bs1.start();
         assertFalse(0 == conf1.getBookiePort());
 
@@ -522,9 +488,6 @@ public void testBookieServerStartupOnEphemeralPorts() throws Exception {
                 new String[] { tmpDir2.getPath() })
             .setZkServers(null);
         BookieServer bs2 = new BookieServer(conf2);
-        RegistrationManager newRm = new ZKRegistrationManager();
-        newRm.initialize(conf2, () -> {}, NullStatsLogger.INSTANCE);
-        bs2.getBookie().registrationManager = newRm;
         bs2.start();
         assertFalse(0 == conf2.getBookiePort());
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
index b77b5e9dd..57fd10131 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShellTest.java
@@ -39,15 +39,16 @@
 import com.google.common.collect.Maps;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.function.Function;
 import org.apache.bookkeeper.bookie.BookieShell.MyCommand;
 import org.apache.bookkeeper.bookie.BookieShell.RecoverCmd;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager.RegistrationListener;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
-import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
 import org.apache.bookkeeper.tools.cli.commands.client.SimpleTestCommand;
 import org.apache.bookkeeper.tools.cli.commands.cluster.ListBookiesCommand;
@@ -71,13 +72,14 @@
  * Unit test for {@link BookieShell}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(BookieShell.class)
+@PrepareForTest({ BookieShell.class, MetadataDrivers.class })
 public class BookieShellTest {
 
     private ClientConfiguration clientConf;
     private BookieShell shell;
     private BookKeeperAdmin admin;
-    private ZKRegistrationManager rm;
+    private RegistrationManager rm;
+    private MetadataBookieDriver driver;
     private Cookie cookie;
     private Version version;
 
@@ -112,8 +114,9 @@ public void setup() throws Exception {
             .withArguments(any(ClientConfiguration.class))
             .thenReturn(admin);
         this.clientConf = new ClientConfiguration();
+        this.clientConf.setMetadataServiceUri("zk://127.0.0.1/path/to/ledgers");
         when(admin.getConf()).thenReturn(this.clientConf);
-        this.rm = PowerMockito.mock(ZKRegistrationManager.class);
+        this.rm = PowerMockito.mock(RegistrationManager.class);
         this.cookie = Cookie.newBuilder()
             .setBookieHost("127.0.0.1:3181")
             .setInstanceId("xyz")
@@ -124,9 +127,22 @@ public void setup() throws Exception {
         this.version = new LongVersion(1L);
         when(rm.readCookie(anyString()))
             .thenReturn(new Versioned<>(cookie.toString().getBytes(UTF_8), version));
-        whenNew(ZKRegistrationManager.class)
-            .withNoArguments()
+
+        this.driver = mock(MetadataBookieDriver.class);
+        when(driver.getRegistrationManager())
             .thenReturn(rm);
+
+        PowerMockito.mockStatic(MetadataDrivers.class);
+        PowerMockito.doAnswer(invocationOnMock -> {
+            Function<RegistrationManager, Object> function = invocationOnMock.getArgument(1);
+            function.apply(rm);
+            return null;
+        }).when(
+            MetadataDrivers.class,
+            "runFunctionWithRegistrationManager",
+            any(ServerConfiguration.class),
+            any(Function.class)
+        );
     }
 
     private static CommandLine parseCommandLine(MyCommand cmd, String... args) throws ParseException {
@@ -228,17 +244,13 @@ void testRecoverCmdRecoverLedger(long ledgerId,
             .recoverBookieData(eq(ledgerId), any(Set.class), eq(dryrun), eq(skipOpenLedgers));
         verify(admin, times(1)).close();
         if (removeCookies) {
-            PowerMockito
-                .verifyNew(ZKRegistrationManager.class, times(1))
-                .withNoArguments();
-            verify(rm, times(1)).initialize(
-                any(ServerConfiguration.class), any(RegistrationListener.class), eq(NullStatsLogger.INSTANCE));
+            PowerMockito.verifyStatic(MetadataDrivers.class);
+            MetadataDrivers.runFunctionWithRegistrationManager(any(ServerConfiguration.class), any(Function.class));
             verify(rm, times(1)).readCookie(anyString());
             verify(rm, times(1)).removeCookie(anyString(), eq(version));
         } else {
-            PowerMockito
-                .verifyNew(ZKRegistrationManager.class, never())
-                .withNoArguments();
+            verify(rm, times(0)).readCookie(anyString());
+            verify(rm, times(0)).removeCookie(anyString(), eq(version));
         }
     }
 
@@ -297,17 +309,13 @@ void testRecoverCmdRecover(boolean dryrun,
             .recoverBookieData(any(Set.class), eq(dryrun), eq(skipOpenLedgers));
         verify(admin, times(1)).close();
         if (removeCookies) {
-            PowerMockito
-                .verifyNew(ZKRegistrationManager.class, times(1))
-                .withNoArguments();
-            verify(rm, times(1)).initialize(
-                any(ServerConfiguration.class), any(RegistrationListener.class), eq(NullStatsLogger.INSTANCE));
+            PowerMockito.verifyStatic(MetadataDrivers.class);
+            MetadataDrivers.runFunctionWithRegistrationManager(any(ServerConfiguration.class), any(Function.class));
             verify(rm, times(1)).readCookie(anyString());
             verify(rm, times(1)).removeCookie(anyString(), eq(version));
         } else {
-            PowerMockito
-                .verifyNew(ZKRegistrationManager.class, never())
-                .withNoArguments();
+            verify(rm, times(0)).readCookie(anyString());
+            verify(rm, times(0)).removeCookie(anyString(), eq(version));
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index a950ca715..7c5ac7b80 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -28,10 +28,13 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_DELETION_SPACE_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.THREAD_RUNTIME;
 import static org.apache.bookkeeper.bookie.TransactionalEntryLogCompactor.COMPACTED_SUFFIX;
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
@@ -54,8 +57,6 @@
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -249,31 +250,34 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact)
         for (File dir : dirManager.getAllLedgerDirs()) {
             Bookie.checkDirectoryStructure(dir);
         }
-        InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
-        storage.initialize(
-            conf,
-            AbstractZkLedgerManagerFactory
-                .newLedgerManagerFactory(
+        runFunctionWithLedgerManagerFactory(conf, lmf -> {
+            try (LedgerManager lm = lmf.newLedgerManager()) {
+                InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
+                storage.initialize(
                     conf,
-                    RegistrationManager.instantiateRegistrationManager(conf).getLayoutManager())
-                .newLedgerManager(),
-            dirManager,
-            dirManager,
-            null,
-            cp,
-            Checkpointer.NULL,
-            NullStatsLogger.INSTANCE);
-        storage.start();
-        long startTime = MathUtils.now();
-        storage.gcThread.enableForceGC();
-        storage.gcThread.triggerGC().get(); //major
-        storage.gcThread.triggerGC().get(); //minor
-        // Minor and Major compaction times should be larger than when we started
-        // this test.
-        assertTrue("Minor or major compaction did not trigger even on forcing.",
-                storage.gcThread.lastMajorCompactionTime > startTime
-                && storage.gcThread.lastMinorCompactionTime > startTime);
-        storage.shutdown();
+                    lm,
+                    dirManager,
+                    dirManager,
+                    null,
+                    cp,
+                    Checkpointer.NULL,
+                    NullStatsLogger.INSTANCE);
+                storage.start();
+                long startTime = MathUtils.now();
+                storage.gcThread.enableForceGC();
+                storage.gcThread.triggerGC().get(); //major
+                storage.gcThread.triggerGC().get(); //minor
+                // Minor and Major compaction times should be larger than when we started
+                // this test.
+                assertTrue("Minor or major compaction did not trigger even on forcing.",
+                    storage.gcThread.lastMajorCompactionTime > startTime
+                        && storage.gcThread.lastMinorCompactionTime > startTime);
+                storage.shutdown();
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
+            }
+            return null;
+        });
     }
 
     @Test
@@ -924,6 +928,18 @@ public void testSuspendGarbageCollection() throws Exception {
         conf.setGcWaitTime(500);
         conf.setMinorCompactionInterval(1);
         conf.setMajorCompactionInterval(2);
+        runFunctionWithLedgerManagerFactory(conf, lmf -> {
+            try (LedgerManager lm = lmf.newLedgerManager()) {
+                testSuspendGarbageCollection(conf, lm);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
+            }
+            return null;
+        });
+    }
+
+    private void testSuspendGarbageCollection(ServerConfiguration conf,
+                                              LedgerManager lm) throws Exception {
         LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         CheckpointSource cp = new CheckpointSource() {
@@ -949,11 +965,7 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact)
         TestStatsProvider stats = new TestStatsProvider();
         storage.initialize(
             conf,
-            AbstractZkLedgerManagerFactory
-                .newLedgerManagerFactory(
-                    conf,
-                    RegistrationManager.instantiateRegistrationManager(conf).getLayoutManager())
-                .newLedgerManager(),
+            lm,
             dirManager,
             dirManager,
             null,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
index 5a518e149..6d4567816 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
@@ -32,6 +32,7 @@
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -40,7 +41,8 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.PortManager;
@@ -75,21 +77,24 @@ private String newDirectory(boolean createCurDir) throws IOException {
         return d.getPath();
     }
 
+    MetadataBookieDriver metadataBookieDriver;
     RegistrationManager rm;
 
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        rm = new ZKRegistrationManager();
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+        this.metadataBookieDriver = MetadataDrivers.getBookieDriver(
+            URI.create(baseConf.getMetadataServiceUri()));
+        this.metadataBookieDriver.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+        this.rm = metadataBookieDriver.getRegistrationManager();
     }
 
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (rm != null) {
-            rm.close();
+        if (metadataBookieDriver != null) {
+            metadataBookieDriver.close();
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 663f0f8cf..6fe5862de 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -41,9 +41,7 @@
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -65,7 +63,6 @@
     private static final Logger LOG = LoggerFactory.getLogger(LedgerCacheTest.class);
 
     SnapshotMap<Long, Boolean> activeLedgers;
-    LedgerManagerFactory ledgerManagerFactory;
     LedgerCache ledgerCache;
     Thread flushThread;
     ServerConfiguration conf;
@@ -88,8 +85,6 @@ public void setUp() throws Exception {
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
         bookie = new Bookie(conf);
 
-        ledgerManagerFactory =
-            AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, null);
         activeLedgers = new SnapshotMap<Long, Boolean>();
         ledgerCache = ((InterleavedLedgerStorage) bookie.ledgerStorage).ledgerCache;
     }
@@ -101,7 +96,6 @@ public void tearDown() throws Exception {
             flushThread.join();
         }
         bookie.ledgerStorage.shutdown();
-        ledgerManagerFactory.close();
         FileUtils.deleteDirectory(txnDir);
         FileUtils.deleteDirectory(ledgerDir);
         for (File dir : tempDirs) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
index 8c8b63689..b7372f1c7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/StateManagerTest.java
@@ -20,35 +20,28 @@
  */
 package org.apache.bookkeeper.bookie;
 
-import static org.apache.bookkeeper.bookie.BookieException.Code.MetadataStoreException;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import java.io.File;
-import java.io.IOException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
-import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.zookeeper.KeeperException;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Testing StateManager cases.
  */
 public class StateManagerTest extends BookKeeperClusterTestCase {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(StateManagerTest.class);
 
     @Rule
     public final TestName runtime = new TestName();
     final ServerConfiguration conf;
-    MockZKRegistrationManager rm;
+    MetadataBookieDriver driver;
 
     public StateManagerTest(){
         super(0);
@@ -56,6 +49,7 @@ public StateManagerTest(){
         baseClientConf.setZkLedgersRootPath(ledgersPath);
         baseConf.setZkLedgersRootPath(ledgersPath);
         conf = TestBKConfiguration.newServerConfiguration();
+        driver = new ZKMetadataBookieDriver();
 
     }
 
@@ -63,7 +57,6 @@ public StateManagerTest(){
     public void setUp() throws Exception {
         super.setUp();
         zkUtil.createBKEnsemble("/" + runtime.getMethodName());
-        rm = new MockZKRegistrationManager();
         File tmpDir = createTempDir("stateManger", "test");
         conf.setJournalDirName(tmpDir.getPath())
                 .setLedgerDirNames(new String[] { tmpDir.getPath() })
@@ -74,64 +67,18 @@ public void setUp() throws Exception {
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (rm != null) {
-            rm.close();
+        if (driver != null) {
+            driver.close();
         }
     }
 
-    private static class MockZKRegistrationManager extends ZKRegistrationManager {
-        boolean registerFailed = false;
-        void setRegisterFail(boolean failOrNot){
-            registerFailed = failOrNot;
-        }
-        @Override
-        public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
-            if (registerFailed) {
-                throw BookieException.create(MetadataStoreException);
-            }
-            super.registerBookie(bookieId, readOnly);
-        }
-
-    }
-
-    /**
-     * Bookie should shutdown when it register to Registration service fail.
-     * On ZooKeeper exception, should return exit code ZK_REG_FAIL = 4
-     */
-    @Test
-    public void testShutdown() throws Exception {
-        File tmpDir = createTempDir("stateManger", "test");
-
-        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
-        conf.setJournalDirName(tmpDir.getPath())
-            .setLedgerDirNames(new String[] { tmpDir.getPath() })
-            .setJournalDirName(tmpDir.toString())
-            .setZkServers(zkUtil.getZooKeeperConnectString());
-
-        BookieServer bkServer = new BookieServer(conf) {
-            protected Bookie newBookie(ServerConfiguration conf)
-                    throws IOException, KeeperException, InterruptedException,
-                    BookieException {
-                Bookie bookie = new Bookie(conf);
-                rm.setRegisterFail(true);
-                rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
-                bookie.setRegistrationManager(rm);
-                return bookie;
-            }
-        };
-        bkServer.start();
-        bkServer.join();
-        assertTrue("Failed to return failCode ZK_REG_FAIL",
-                ExitCode.ZK_REG_FAIL == bkServer.getExitCode());
-    }
-
     /**
      * StateManager can transition between writable mode and readOnly mode if it was not created with readOnly mode.
      */
     @Test
     public void testNormalBookieTransitions() throws Exception {
-        BookieStateManager stateManager = new BookieStateManager(conf, rm);
-        rm.initialize(conf, () -> {
+        BookieStateManager stateManager = new BookieStateManager(conf, driver);
+        driver.initialize(conf, () -> {
             stateManager.forceToUnregistered();
             // schedule a re-register operation
             stateManager.registerBookie(false);
@@ -158,7 +105,7 @@ public void testNormalBookieTransitions() throws Exception {
     public void testReadOnlyDisableBookieTransitions() throws Exception {
         conf.setReadOnlyModeEnabled(false);
         // readOnly disabled bk stateManager
-        BookieStateManager stateManager = new BookieStateManager(conf, rm);
+        BookieStateManager stateManager = new BookieStateManager(conf, driver);
         // simulate sync shutdown logic in bookie
         stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
             @Override
@@ -174,11 +121,14 @@ public void shutdown(int code) {
                 }
             }
         });
-        rm.initialize(conf, () -> {
-            stateManager.forceToUnregistered();
-            // schedule a re-register operation
-            stateManager.registerBookie(false);
-        }, NullStatsLogger.INSTANCE);
+        driver.initialize(
+            conf,
+            () -> {
+                stateManager.forceToUnregistered();
+                // schedule a re-register operation
+                stateManager.registerBookie(false);
+            },
+            NullStatsLogger.INSTANCE);
 
         stateManager.initState();
         stateManager.registerBookie(true).get();
@@ -220,12 +170,15 @@ public void testReadOnlyBookieTransitions() throws Exception{
      */
     @Test
     public void testRegistration() throws Exception {
-        BookieStateManager stateManager = new BookieStateManager(conf, rm);
-        rm.initialize(conf, () -> {
-            stateManager.forceToUnregistered();
-            // schedule a re-register operation
-            stateManager.registerBookie(false);
-        }, NullStatsLogger.INSTANCE);
+        BookieStateManager stateManager = new BookieStateManager(conf, driver);
+        driver.initialize(
+            conf,
+            () -> {
+                stateManager.forceToUnregistered();
+                // schedule a re-register operation
+                stateManager.registerBookie(false);
+            },
+            NullStatsLogger.INSTANCE);
         // simulate sync shutdown logic in bookie
         stateManager.setShutdownHandler(new StateManager.ShutdownHandler() {
             @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
index 0d9f7ebfa..2ce8d86d3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
@@ -24,11 +24,13 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.List;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -46,6 +48,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(UpdateCookieCmdTest.class);
 
+    MetadataBookieDriver driver;
     RegistrationManager rm;
 
     public UpdateCookieCmdTest() {
@@ -56,16 +59,18 @@ public UpdateCookieCmdTest() {
     public void setUp() throws Exception {
         super.setUp();
         LOG.info("setUp ZKRegistrationManager");
-        rm = new ZKRegistrationManager();
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+        driver = MetadataDrivers.getBookieDriver(
+            URI.create(baseConf.getMetadataServiceUri()));
+        driver.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+        rm = driver.getRegistrationManager();
     }
 
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (rm != null) {
-            rm.close();
+        if (driver != null) {
+            driver.close();
         }
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index b841baae1..75cdcd26e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -20,17 +20,19 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -41,7 +43,6 @@
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -110,14 +111,20 @@ public void testWatchEnsembleChange() throws Exception {
 
     @Test
     public void testWatchMetadataRemoval() throws Exception {
-        LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        factory.initialize(baseConf,
-            RegistrationManager.instantiateRegistrationManager(baseConf).getLayoutManager(),
-            factory.getCurrentVersion());
+        runFunctionWithLedgerManagerFactory(baseConf, factory -> {
+            try {
+                testWatchMetadataRemoval(factory);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
+            }
+            return null;
+        });
+    }
 
-        final LedgerManager manager = factory.newLedgerManager();
-        LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator();
+    private void testWatchMetadataRemoval(LedgerManagerFactory factory) throws Exception {
+        @Cleanup final LedgerManager manager = factory.newLedgerManager();
+        @Cleanup LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator();
 
         final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
         final CountDownLatch createLatch = new CountDownLatch(1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
index 0d25ccce7..094c57047 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/AbstractConfigurationTest.java
@@ -23,6 +23,7 @@
 import static org.mockito.Mockito.mock;
 
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
+import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.junit.Before;
@@ -34,6 +35,8 @@
 public class AbstractConfigurationTest {
 
     private static final String DEFAULT_METADATA_SERVICE_URI =
+        "zk+null://127.0.0.1/path/to/ledgers";
+    private static final String HIERARCHICAL_METADATA_SERVICE_URI =
         "zk+hierarchical://127.0.0.1/path/to/ledgers";
     private static final String FLAT_METADATA_SERVICE_URI =
         "zk+flat://127.0.0.1/path/to/ledgers";
@@ -88,6 +91,15 @@ public void testFlatLedgerManagerUri() throws Exception {
             conf.getMetadataServiceUri());
     }
 
+    @SuppressWarnings({ "unchecked" })
+    @Test
+    public void testHierarchicalLedgerManagerUri() throws Exception {
+        conf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
+        assertEquals(
+            HIERARCHICAL_METADATA_SERVICE_URI,
+            conf.getMetadataServiceUri());
+    }
+
     @SuppressWarnings({ "unchecked" })
     @Test
     public void testLongHierarchicalLedgerManagerUri() throws Exception {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationManager.java
index 284e4e9c6..ff1b7ea55 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationManager.java
@@ -18,56 +18,8 @@
  */
 package org.apache.bookkeeper.discover;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.bookkeeper.meta.LayoutManager;
-import org.apache.bookkeeper.meta.LedgerLayout;
-import org.junit.Test;
-
 /**
  * Unit test of {@link RegistrationManager}.
  */
 public class TestZkRegistrationManager {
-    private final LedgerLayout ledgerLayout;
-    private final LayoutManager layoutManager;
-    private final ZKRegistrationManager zkRegistrationManager;
-
-    public TestZkRegistrationManager() {
-        this.ledgerLayout = mock(LedgerLayout.class);
-        this.layoutManager = mock(LayoutManager.class);
-        this.zkRegistrationManager = new ZKRegistrationManager();
-        zkRegistrationManager.setLayoutManager(layoutManager);
-    }
-
-    @Test
-    public void testGetLayoutManager() throws Exception {
-        assertEquals(layoutManager, zkRegistrationManager.getLayoutManager());
-    }
-
-    @Test
-    public void testReadLedgerLayout() throws Exception {
-        when(layoutManager.readLedgerLayout()).thenReturn(ledgerLayout);
-        assertEquals(ledgerLayout, zkRegistrationManager.getLayoutManager().readLedgerLayout());
-    }
-
-    @Test
-    public void testStoreLedgerLayout() throws Exception {
-        zkRegistrationManager.getLayoutManager().storeLedgerLayout(ledgerLayout);
-
-        verify(layoutManager, times(1))
-            .storeLedgerLayout(eq(ledgerLayout));
-    }
-
-    @Test
-    public void testDeleteLedgerLayout() throws Exception {
-        zkRegistrationManager.getLayoutManager().deleteLedgerLayout();
-
-        verify(layoutManager, times(1))
-            .deleteLedgerLayout();
-    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 7a28079b8..1cfbaada3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -23,10 +23,12 @@
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Optional;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -37,9 +39,10 @@
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.StateManager;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.SnapshotMap;
@@ -55,10 +58,12 @@
 @RunWith(Parameterized.class)
 public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
 
+    protected MetadataClientDriver clientDriver;
     protected LedgerManagerFactory ledgerManagerFactory;
     protected LedgerManager ledgerManager = null;
     protected LedgerIdGenerator ledgerIdGenerator = null;
     protected SnapshotMap<Long, Boolean> activeLedgers = null;
+    protected OrderedScheduler scheduler;
 
     public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
         this(lmFactoryCls, 0);
@@ -105,10 +110,20 @@ public LedgerIdGenerator getLedgerIdGenerator() throws IOException {
     public void setUp() throws Exception {
         super.setUp();
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        ledgerManagerFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            baseConf,
-            RegistrationManager
-                .instantiateRegistrationManager(baseConf).getLayoutManager());
+
+        scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        clientDriver = MetadataDrivers.getClientDriver(
+            URI.create(baseClientConf.getMetadataServiceUri()));
+        clientDriver.initialize(
+            baseClientConf,
+            scheduler,
+            NullStatsLogger.INSTANCE,
+            Optional.empty());
+        ledgerManagerFactory = clientDriver.getLedgerManagerFactory();
     }
 
     @After
@@ -117,7 +132,12 @@ public void tearDown() throws Exception {
         if (null != ledgerManager) {
             ledgerManager.close();
         }
-        ledgerManagerFactory.close();
+        if (null != clientDriver) {
+            clientDriver.close();
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
         super.tearDown();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
index 90207d88c..b5868903d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MetadataDriversTest.java
@@ -136,7 +136,7 @@ public String getScheme() {
         }
 
         @Override
-        public RegistrationManager getRegistrationManager() throws MetadataException {
+        public RegistrationManager getRegistrationManager() {
             return mock(RegistrationManager.class);
         }
 
@@ -165,7 +165,7 @@ public String getScheme() {
         }
 
         @Override
-        public RegistrationManager getRegistrationManager() throws MetadataException {
+        public RegistrationManager getRegistrationManager() {
             return mock(RegistrationManager.class);
         }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
index 8a0ac6c12..2bd4e36b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.meta;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -30,7 +31,6 @@
 import java.util.concurrent.CyclicBarrier;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -115,7 +115,7 @@ public void testBadConf() throws Exception {
         } catch (Exception e) {
             LOG.error("Received exception", e);
             assertTrue("Invalid exception",
-                    e.getMessage().contains("Failed to instantiate ledger manager factory DoesNotExist"));
+                    e.getMessage().contains("Failed to retrieve metadata service uri from configuration"));
         }
     }
 
@@ -243,12 +243,7 @@ public void run() {
 
             try {
                 barrier.await();
-                LedgerManagerFactory factory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                    conf,
-                    RegistrationManager
-                        .instantiateRegistrationManager(new ServerConfiguration(conf)).getLayoutManager());
-                factory.close();
-
+                runFunctionWithLedgerManagerFactory(new ServerConfiguration(conf), factory -> null);
                 success = true;
             } catch (Exception e) {
                 LOG.error("Failed to create ledger manager", e);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriverTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriverTest.java
index 99504a0d6..ca3279de6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriverTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataBookieDriverTest.java
@@ -20,6 +20,7 @@
 
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -31,6 +32,7 @@
 import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.ZooKeeper;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -68,7 +70,14 @@ public void testGetRegManager() throws Exception {
         ZKRegistrationManager mockRegManager = PowerMockito.mock(ZKRegistrationManager.class);
 
         PowerMockito.whenNew(ZKRegistrationManager.class)
-            .withNoArguments()
+            .withParameterTypes(
+                ServerConfiguration.class,
+                ZooKeeper.class,
+                RegistrationListener.class)
+            .withArguments(
+                any(ServerConfiguration.class),
+                any(ZooKeeper.class),
+                any(RegistrationListener.class))
             .thenReturn(mockRegManager);
 
         RegistrationManager manager = driver.getRegistrationManager();
@@ -76,12 +85,10 @@ public void testGetRegManager() throws Exception {
         assertSame(mockRegManager, driver.regManager);
 
         PowerMockito.verifyNew(ZKRegistrationManager.class, times(1))
-            .withNoArguments();
-        verify(mockRegManager, times(1))
-            .initialize(
+            .withArguments(
                 same(conf),
-                same(listener),
-                same(NullStatsLogger.INSTANCE));
+                same(mockZkc),
+                same(listener));
 
         driver.close();
         verify(mockRegManager, times(1)).close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
index 7a477c0ac..2386a7fe0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverBaseTest.java
@@ -35,13 +35,12 @@
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 
-import java.net.URI;
 import java.util.Optional;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
-import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,21 +57,20 @@
 public class ZKMetadataDriverBaseTest extends ZKMetadataDriverTestBase {
 
     private ZKMetadataDriverBase driver;
+    private RetryPolicy retryPolicy;
 
     @Before
     public void setup() throws Exception {
         super.setup(new ClientConfiguration());
         driver = mock(ZKMetadataDriverBase.class, CALLS_REAL_METHODS);
+        retryPolicy = mock(RetryPolicy.class);
     }
 
     @Test
     public void testInitialize() throws Exception {
-        driver.initialize(conf, NullStatsLogger.INSTANCE, Optional.empty());
+        driver.initialize(
+            conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty());
 
-        assertEquals(URI.create(metadataServiceUri), driver.metadataServiceUri);
-        assertEquals(
-            HierarchicalLedgerManagerFactory.class,
-            driver.ledgerManagerFactoryClass);
         assertEquals(
             "/path/to/ledgers",
             driver.ledgersRootPath);
@@ -98,14 +96,11 @@ public void testInitialize() throws Exception {
     public void testInitializeExternalZooKeeper() throws Exception {
         ZooKeeperClient anotherZk = mock(ZooKeeperClient.class);
 
-        driver.initialize(conf, NullStatsLogger.INSTANCE, Optional.of(anotherZk));
+        driver.initialize(
+            conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.of(anotherZk));
 
-        assertEquals(URI.create(metadataServiceUri), driver.metadataServiceUri);
-        assertEquals(
-            HierarchicalLedgerManagerFactory.class,
-            driver.ledgerManagerFactoryClass);
         assertEquals(
-            "/path/to/ledgers",
+            "/ledgers",
             driver.ledgersRootPath);
         assertFalse(driver.ownZKHandle);
 
@@ -127,7 +122,8 @@ public void testInitializeExternalZooKeeper() throws Exception {
 
     @Test
     public void testGetLedgerManagerFactory() throws Exception {
-        driver.initialize(conf, NullStatsLogger.INSTANCE, Optional.empty());
+        driver.initialize(
+            conf, NullStatsLogger.INSTANCE, retryPolicy, Optional.empty());
 
         mockStatic(AbstractZkLedgerManagerFactory.class);
         LedgerManagerFactory factory = mock(LedgerManagerFactory.class);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
index 22d5f1b4f..c0f338351 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/zk/ZKMetadataDriverTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.meta.zk;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyDouble;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -54,6 +55,7 @@ public void setup(AbstractConfiguration<?> conf) throws Exception {
         when(mockZkBuilder.sessionTimeoutMs(anyInt())).thenReturn(mockZkBuilder);
         when(mockZkBuilder.operationRetryPolicy(any(RetryPolicy.class)))
             .thenReturn(mockZkBuilder);
+        when(mockZkBuilder.requestRateLimit(anyDouble())).thenReturn(mockZkBuilder);
         when(mockZkBuilder.statsLogger(any(StatsLogger.class))).thenReturn(mockZkBuilder);
 
         this.mockZkc = mock(ZooKeeperClient.class);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index e0f2f9780..5e2d270bc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -25,7 +25,9 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,6 +36,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,23 +45,25 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.zookeeper.KeeperException;
@@ -70,7 +75,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Tests publishing of under replicated ledgers by the Auditor bookie node when
  * corresponding bookies identifes as not running.
@@ -128,6 +132,7 @@ public void setUp() throws Exception {
         urLedgerList = new HashSet<Long>();
         ledgerList = new ArrayList<Long>(2);
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
     }
 
     @Override
@@ -521,6 +526,14 @@ public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
                 "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to BaseConf's lostBookieRecoveryDelay",
                 lostBookieRecoveryDelayConfValue, lostBookieRecoveryDelayBeforeChange);
 
+        @Cleanup("shutdown") OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+        @Cleanup MetadataClientDriver driver =
+            MetadataDrivers.getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
+        driver.initialize(baseClientConf, scheduler, NullStatsLogger.INSTANCE, Optional.of(zkc));
+
         // there is no easy way to validate if the Auditor has executed Audit process (Auditor.startAudit),
         // without shuttingdown Bookie. To test if by resetting LostBookieRecoveryDelay it does Auditing
         // even when there is no pending AuditTask, following approach is needed.
@@ -537,23 +550,19 @@ public void testTriggerAuditorWithNoPendingAuditTask() throws Exception {
             ensemble.add(new BookieSocketAddress("11.11.11.11:1111"));
             ensemble.add(new BookieSocketAddress("88.88.88.88:8888"));
             metadata.addEnsemble(0, ensemble);
-            LedgerManager ledgerManager = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                baseClientConf,
-                RegistrationManager
-                    .instantiateRegistrationManager(new ServerConfiguration(baseClientConf))
-                    .getLayoutManager()).newLedgerManager();
 
             MutableInt ledgerCreateRC = new MutableInt(-1);
             CountDownLatch latch = new CountDownLatch(1);
             long ledgerId = (Math.abs(rand.nextLong())) % 100000000;
-            ledgerManager.createLedgerMetadata(ledgerId, metadata,
-                    new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-                        @Override
-                        public void operationComplete(int rc, Void result) {
-                            ledgerCreateRC.setValue(rc);
-                            latch.countDown();
-                        }
+
+            try (LedgerManager lm = driver.getLedgerManagerFactory().newLedgerManager()) {
+                lm.createLedgerMetadata(ledgerId, metadata,
+                    (rc, result) -> {
+                        ledgerCreateRC.setValue(rc);
+                        latch.countDown();
                     });
+            }
+
             Assert.assertTrue("Ledger creation should complete within 2 secs",
                     latch.await(2000, TimeUnit.MILLISECONDS));
             Assert.assertEquals("LedgerCreate should succeed and return OK rc value", BKException.Code.OK,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index 919108432..376427b17 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -20,19 +20,19 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.util.List;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -97,31 +97,33 @@ public void tearDown() throws Exception {
     @Test
     public void testPeriodicBookieCheckInterval() throws Exception {
         bsConfs.get(0).setZkServers(zkUtil.getZooKeeperConnectString());
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
-
-        LedgerManager ledgerManager = mFactory.newLedgerManager();
-        final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
-        final int numLedgers = 1;
-
-        LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
-        LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
-        List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
-        ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
-
-        TestCallbacks.GenericCallbackFuture<Void> cb = new TestCallbacks.GenericCallbackFuture<Void>();
-        ledgerManager.writeLedgerMetadata(lh.getId(), md, cb);
-        cb.get();
-
-        long underReplicatedLedger = -1;
-        for (int i = 0; i < 10; i++) {
-            underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
-            if (underReplicatedLedger != -1) {
-                break;
+        runFunctionWithLedgerManagerFactory(bsConfs.get(0), mFactory -> {
+            try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+                @Cleanup final LedgerUnderreplicationManager underReplicationManager =
+                    mFactory.newLedgerUnderreplicationManager();
+
+                LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+                LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
+                List<BookieSocketAddress> ensemble = md.getEnsembles().get(0L);
+                ensemble.set(0, new BookieSocketAddress("1.1.1.1", 1000));
+
+                TestCallbacks.GenericCallbackFuture<Void> cb = new TestCallbacks.GenericCallbackFuture<Void>();
+                ledgerManager.writeLedgerMetadata(lh.getId(), md, cb);
+                cb.get();
+
+                long underReplicatedLedger = -1;
+                for (int i = 0; i < 10; i++) {
+                    underReplicatedLedger = underReplicationManager.pollLedgerToRereplicate();
+                    if (underReplicatedLedger != -1) {
+                        break;
+                    }
+                    Thread.sleep(CHECK_INTERVAL * 1000);
+                }
+                assertEquals("Ledger should be under replicated", lh.getId(), underReplicatedLedger);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e);
             }
-            Thread.sleep(CHECK_INTERVAL * 1000);
-        }
-        assertEquals("Ledger should be under replicated", lh.getId(), underReplicatedLedger);
+            return null;
+        });
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index ddb163fbe..e706d1ef0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -30,6 +30,7 @@
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,10 +50,10 @@
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -73,6 +74,7 @@
     private static final Logger LOG = LoggerFactory
             .getLogger(AuditorPeriodicCheckTest.class);
 
+    private MetadataBookieDriver driver;
     private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
     private List<ZooKeeper> zkClients = new LinkedList<ZooKeeper>();
 
@@ -106,11 +108,22 @@ public void setUp() throws Exception {
             auditorElector.start();
             LOG.debug("Starting Auditor Elector");
         }
+
+        driver = MetadataDrivers.getBookieDriver(
+            URI.create(bsConfs.get(0).getMetadataServiceUri()));
+        driver.initialize(
+            bsConfs.get(0),
+            () -> {},
+            NullStatsLogger.INSTANCE);
     }
 
     @After
     @Override
     public void tearDown() throws Exception {
+        if (null != driver) {
+            driver.close();
+        }
+
         for (AuditorElector e : auditorElectors.values()) {
             e.shutdown();
         }
@@ -127,9 +140,7 @@ public void tearDown() throws Exception {
      */
     @Test
     public void testEntryLogCorruption() throws Exception {
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
         underReplicationManager.disableLedgerReplication();
 
@@ -178,9 +189,7 @@ public boolean accept(File dir, String name) {
      */
     @Test
     public void testIndexCorruption() throws Exception {
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
 
         LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
 
@@ -228,9 +237,7 @@ public void testIndexCorruption() throws Exception {
      */
     @Test
     public void testPeriodicCheckWhenDisabled() throws Exception {
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
         final int numLedgers = 10;
         final int numMsgs = 2;
@@ -405,9 +412,7 @@ public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback cb,
      */
     @Test
     public void testFailedWriteRecovery() throws Exception {
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-                bsConfs.get(0),
-                RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
         underReplicationManager.disableLedgerReplication();
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index d4c918f2e..632a31b43 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -20,13 +20,13 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -50,10 +50,20 @@ public AuditorRollingRestartTest() {
      */
     @Test
     public void testAuditingDuringRollingRestart() throws Exception {
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
+        runFunctionWithLedgerManagerFactory(
             bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
+            mFactory -> {
+                try {
+                    testAuditingDuringRollingRestart(mFactory);
+                } catch (Exception e) {
+                    throw new UncheckedExecutionException(e.getMessage(), e);
+                }
+                return null;
+            }
+        );
+    }
 
+    private void testAuditingDuringRollingRestart(LedgerManagerFactory mFactory) throws Exception {
         final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
 
         LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index 281c3c79f..f63c1db14 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -25,8 +25,10 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.SortedMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -35,18 +37,19 @@
 import org.apache.bookkeeper.client.BookKeeperTestClient;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
-import org.apache.bookkeeper.meta.LayoutManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -70,9 +73,11 @@
     private static final String openLedgerRereplicationGracePeriod = "3000"; // milliseconds
 
     private DigestType digestType;
+    private MetadataClientDriver metadataClientDriver;
     private LedgerManagerFactory mFactory;
     private LedgerUnderreplicationManager underReplicationManager;
     private LedgerManager ledgerManager;
+    private OrderedScheduler scheduler;
 
     private final String underreplicatedPath = baseClientConf
             .getZkLedgersRootPath() + "/underreplication/ledgers";
@@ -96,29 +101,30 @@ public void setUp() throws Exception {
         super.setUp();
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        // initialize urReplicationManager
-        LayoutManager layoutManager = RegistrationManager
-            .instantiateRegistrationManager(new ServerConfiguration(baseClientConf)).getLayoutManager();
 
-        mFactory = AbstractZkLedgerManagerFactory
-            .newLedgerManagerFactory(
-                baseClientConf,
-                layoutManager);
+        scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        metadataClientDriver = MetadataDrivers.getClientDriver(
+            URI.create(baseClientConf.getMetadataServiceUri()));
+        metadataClientDriver.initialize(
+            baseClientConf,
+            scheduler,
+            NullStatsLogger.INSTANCE,
+            Optional.empty());
+
+        // initialize urReplicationManager
+        mFactory = metadataClientDriver.getLedgerManagerFactory();
         underReplicationManager = mFactory.newLedgerUnderreplicationManager();
-        LedgerManagerFactory newLedgerManagerFactory = AbstractZkLedgerManagerFactory
-            .newLedgerManagerFactory(
-                baseClientConf,
-                layoutManager);
-        ledgerManager = newLedgerManagerFactory.newLedgerManager();
+        ledgerManager = mFactory.newLedgerManager();
     }
 
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (null != mFactory) {
-            mFactory.close();
-            mFactory = null;
-        }
+
         if (null != underReplicationManager) {
             underReplicationManager.close();
             underReplicationManager = null;
@@ -127,6 +133,13 @@ public void tearDown() throws Exception {
             ledgerManager.close();
             ledgerManager = null;
         }
+        if (null != metadataClientDriver) {
+            metadataClientDriver.close();
+            metadataClientDriver = null;
+        }
+        if (null != scheduler) {
+            scheduler.shutdown();
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index d77bce939..9445fe22d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -23,22 +23,28 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.ClientUtil;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -58,9 +64,11 @@
             .getLogger(TestReplicationWorker.class);
     private String basePath = "";
     private String baseLockPath = "";
+    private MetadataBookieDriver driver;
     private LedgerManagerFactory mFactory;
     private LedgerUnderreplicationManager underReplicationManager;
     private static byte[] data = "TestReplicationWorker".getBytes();
+    private OrderedScheduler scheduler;
 
     public TestReplicationWorker() {
         this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
@@ -86,25 +94,33 @@ public TestReplicationWorker() {
     public void setUp() throws Exception {
         super.setUp();
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+
+        this.scheduler = OrderedScheduler.newSchedulerBuilder()
+            .name("test-scheduler")
+            .numThreads(1)
+            .build();
+
+        this.driver = MetadataDrivers.getBookieDriver(
+            URI.create(baseConf.getMetadataServiceUri()));
+        this.driver.initialize(
+            baseConf,
+            () -> {},
+            NullStatsLogger.INSTANCE);
         // initialize urReplicationManager
-        mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            baseClientConf,
-            RegistrationManager
-                .instantiateRegistrationManager(new ServerConfiguration(baseClientConf)).getLayoutManager());
+        mFactory = driver.getLedgerManagerFactory();
         underReplicationManager = mFactory.newLedgerUnderreplicationManager();
     }
 
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        if (null != mFactory){
-            mFactory.close();
-            mFactory = null;
-        }
         if (null != underReplicationManager){
             underReplicationManager.close();
             underReplicationManager = null;
         }
+        if (null != driver) {
+            driver.close();
+        }
     }
 
     /**
@@ -400,12 +416,11 @@ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR()
         baseConf.setOpenLedgerRereplicationGracePeriod("3000");
         ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory
-            .newLedgerManagerFactory(
-                baseClientConf,
-                RegistrationManager
-                    .instantiateRegistrationManager(new ServerConfiguration(baseClientConf))
-                    .getLayoutManager());
+        @Cleanup MetadataClientDriver clientDriver = MetadataDrivers.getClientDriver(
+            URI.create(baseClientConf.getMetadataServiceUri()));
+        clientDriver.initialize(baseClientConf, scheduler, NullStatsLogger.INSTANCE, Optional.empty());
+
+        LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
 
         LedgerUnderreplicationManager underReplicationManager = mFactory
                 .newLedgerUnderreplicationManager();
@@ -464,12 +479,12 @@ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR()
         ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory
-            .newLedgerManagerFactory(
-                baseClientConf,
-                RegistrationManager
-                    .instantiateRegistrationManager(new ServerConfiguration(baseClientConf))
-                    .getLayoutManager());
+
+        @Cleanup MetadataClientDriver driver = MetadataDrivers.getClientDriver(
+            URI.create(baseClientConf.getMetadataServiceUri()));
+        driver.initialize(baseClientConf, scheduler, NullStatsLogger.INSTANCE, Optional.empty());
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
 
         LedgerUnderreplicationManager underReplicationManager = mFactory
                 .newLedgerUnderreplicationManager();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index bc95fc225..7612a8602 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -18,28 +18,29 @@
  */
 package org.apache.bookkeeper.server.http;
 
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.File;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerHandleAdapter;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
@@ -651,6 +652,17 @@ public void testWhoIsAuditorService() throws Exception {
     @Test
     public void testListUnderReplicatedLedgerService() throws Exception {
         baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        runFunctionWithLedgerManagerFactory(baseConf, mFactory -> {
+            try {
+                testListUnderReplicatedLedgerService(mFactory);
+            } catch (Exception e) {
+                throw new UncheckedExecutionException(e.getMessage(), e.getCause());
+            }
+            return null;
+        });
+    }
+
+    private void testListUnderReplicatedLedgerService(LedgerManagerFactory mFactory) throws Exception {
         startAuditorElector();
 
         HttpEndpointService listUnderReplicatedLedgerService = bkHttpServiceProvider
@@ -661,15 +673,11 @@ public void testListUnderReplicatedLedgerService() throws Exception {
         HttpServiceResponse response1 = listUnderReplicatedLedgerService.handle(request1);
         assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response1.getStatusCode());
 
-
         //2,  GET, should return success.
         // first put ledger into rereplicate. then use api to list ur ledger.
-        LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory.newLedgerManagerFactory(
-            bsConfs.get(0),
-            RegistrationManager.instantiateRegistrationManager(bsConfs.get(0)).getLayoutManager());
-
-        LedgerManager ledgerManager = mFactory.newLedgerManager();
-        final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+        @Cleanup LedgerManager ledgerManager = mFactory.newLedgerManager();
+        @Cleanup final LedgerUnderreplicationManager underReplicationManager =
+            mFactory.newLedgerUnderreplicationManager();
 
         LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, "passwd".getBytes());
         LedgerMetadata md = LedgerHandleAdapter.getLedgerMetadata(lh);
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTest.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTest.java
index 07f3cb8dd..d541db987 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTest.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTest.java
@@ -55,6 +55,7 @@ public void setup() throws Exception {
         this.cmd = mock(ClientCommand.class, CALLS_REAL_METHODS);
 
         this.serverConf = new ServerConfiguration();
+        this.serverConf.setMetadataServiceUri("zk://127.0.0.1/path/to/ledgers");
         this.clientConf = new ClientConfiguration(serverConf);
         PowerMockito.whenNew(ClientConfiguration.class)
             .withParameterTypes(AbstractConfiguration.class)
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
index 7d551ed55..6b41e5763 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/ClientCommandTestBase.java
@@ -54,6 +54,7 @@
     public void setup() throws Exception {
         mockBk = mock(BookKeeper.class);
         this.clientConf = spy(new ClientConfiguration(conf));
+        this.clientConf.setMetadataServiceUri("zk://127.0.0.1/path/to/ledgers");
         PowerMockito.whenNew(ClientConfiguration.class)
             .withParameterTypes(AbstractConfiguration.class)
             .withArguments(eq(conf))
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
index 670942227..b93cf3854 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/CommandTestBase.java
@@ -50,6 +50,7 @@ protected CommandRunner createCommandRunner(Command command) {
 
     public CommandTestBase() {
         this.conf = new ServerConfiguration();
+        this.conf.setMetadataServiceUri("zk://127.0.0.1/path/to/ledgers");
     }
 
 }
diff --git a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
index f2c6dc6e7..004ba2423 100644
--- a/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
+++ b/bookkeeper-tools/src/test/java/org/apache/bookkeeper/tools/cli/helpers/DiscoveryCommandTest.java
@@ -66,6 +66,7 @@ public void setup() throws Exception {
         this.cmd = mock(DiscoveryCommand.class, CALLS_REAL_METHODS);
 
         this.serverConf = new ServerConfiguration();
+        this.serverConf.setMetadataServiceUri("zk://127.0.0.1/path/to/ledgers");
         this.clientConf = new ClientConfiguration(serverConf);
         PowerMockito.whenNew(ClientConfiguration.class)
             .withParameterTypes(AbstractConfiguration.class)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services