You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/27 18:58:26 UTC

[bookkeeper] branch master updated: ISSUE #662: Introduce Bookie Registration Manager for bookie server

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e9b8570  ISSUE #662: Introduce Bookie Registration Manager for bookie server
e9b8570 is described below

commit e9b857092703719c9ea38a3a74107fe5114af94a
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Fri Oct 27 11:58:19 2017 -0700

    ISSUE #662: Introduce Bookie Registration Manager for bookie server
    
    Descriptions of the changes in this PR:
    While in BOOKKEEPER-628, It is trying to improve/generalize the bookie registration process.
    This PR is for bookie side, it tries to introduce Bookie registration manager for bookie server.
    
    Author: Jia Zhai <zh...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #663 from zhaijack/bookie_registration, closes #662
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  | 354 +++++--------------
 .../apache/bookkeeper/bookie/BookieException.java  |  86 ++++-
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 102 +++---
 .../java/org/apache/bookkeeper/bookie/Cookie.java  | 177 ++++------
 .../bookkeeper/bookie/FileSystemUpgrade.java       |  53 ++-
 .../bookkeeper/conf/ServerConfiguration.java       |  28 ++
 .../bookkeeper/discover/RegistrationClient.java    |  36 ++
 .../bookkeeper/discover/RegistrationManager.java   | 108 ++++++
 .../bookkeeper/discover/ZKRegistrationManager.java | 391 +++++++++++++++++++++
 .../apache/bookkeeper/discover/package-info.java   |  22 ++
 .../bookkeeper/http/ExpandStorageService.java      |   9 +-
 .../bookkeeper/http/RecoveryBookieService.java     |  12 +-
 .../bookie/BookieInitializationTest.java           |  79 ++++-
 .../org/apache/bookkeeper/bookie/CookieTest.java   |  36 +-
 .../bookkeeper/bookie/UpdateCookieCmdTest.java     |  39 +-
 15 files changed, 1040 insertions(+), 492 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 220aa4c..953fb17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -21,13 +21,11 @@
 
 package org.apache.bookkeeper.bookie;
 
-import static com.google.common.base.Charsets.UTF_8;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_ADD_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_READ_ENTRY_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_RECOVERY_ADD_ENTRY;
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.JOURNAL_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE;
@@ -54,25 +52,28 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Observable;
 import java.util.Observer;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -87,24 +88,15 @@ import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.mutable.MutableBoolean;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,18 +121,14 @@ public class Bookie extends BookieCriticalThread {
     static final long METAENTRY_ID_LEDGER_KEY = -0x1000;
     static final long METAENTRY_ID_FENCE_KEY  = -0x2000;
 
-    // ZK registration path for this bookie
-    protected final String bookieRegistrationPath;
-    protected final String bookieReadonlyRegistrationPath;
-
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
 
     LedgerDirsMonitor ledgerMonitor;
     LedgerDirsMonitor idxMonitor;
 
-    // ZooKeeper client instance for the Bookie
-    ZooKeeper zk;
+    // Registration Manager for managing registration
+    RegistrationManager registrationManager;
 
     // Running flag
     private volatile boolean running = false;
@@ -151,11 +139,9 @@ public class Bookie extends BookieCriticalThread {
 
     private final ConcurrentLongHashMap<byte[]> masterKeyCache = new ConcurrentLongHashMap<>();
 
-    protected final String zkBookieRegPath;
-    protected final String zkBookieReadOnlyPath;
-    protected final List<ACL> zkAcls;
+    protected final String bookieId;
 
-    private final AtomicBoolean zkRegistered = new AtomicBoolean(false);
+    private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
     protected final AtomicBoolean readOnly = new AtomicBoolean(false);
     // executor to manage the state changes for a bookie.
     final ExecutorService stateService = Executors.newSingleThreadExecutor(
@@ -254,19 +240,29 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    @VisibleForTesting
+    public void setRegistrationManager(RegistrationManager rm) {
+        this.registrationManager = rm;
+    }
+
+    @VisibleForTesting
+    public RegistrationManager getRegistrationManager() {
+        return this.registrationManager;
+    }
+
     /**
      * Check that the environment for the bookie is correct.
      * This means that the configuration has stayed the same as the
      * first run and the filesystem structure is up to date.
      */
-    private void checkEnvironment(ZooKeeper zk) throws BookieException, IOException {
+    private void checkEnvironment(RegistrationManager rm) throws BookieException, IOException {
         List<File> allLedgerDirs = new ArrayList<File>(ledgerDirsManager.getAllLedgerDirs().size()
                                                      + indexDirsManager.getAllLedgerDirs().size());
         allLedgerDirs.addAll(ledgerDirsManager.getAllLedgerDirs());
         if (indexDirsManager != ledgerDirsManager) {
             allLedgerDirs.addAll(indexDirsManager.getAllLedgerDirs());
         }
-        if (zk == null) { // exists only for testing, just make sure directories are correct
+        if (rm == null) { // exists only for testing, just make sure directories are correct
 
             for (File journalDirectory : journalDirectories) {
                 checkDirectoryStructure(journalDirectory);
@@ -279,7 +275,7 @@ public class Bookie extends BookieCriticalThread {
         }
 
         if (conf.getAllowStorageExpansion()) {
-            checkEnvironmentWithStorageExpansion(conf, zk, journalDirectories, allLedgerDirs);
+            checkEnvironmentWithStorageExpansion(conf, rm, journalDirectories, allLedgerDirs);
             return;
         }
 
@@ -303,20 +299,20 @@ public class Bookie extends BookieCriticalThread {
                 }
             }
 
-            String instanceId = getInstanceId(conf, zk);
+            String instanceId = rm.getClusterInstanceId();
             Cookie.Builder builder = Cookie.generateCookie(conf);
             if (null != instanceId) {
                 builder.setInstanceId(instanceId);
             }
             Cookie masterCookie = builder.build();
-            Versioned<Cookie> zkCookie = null;
+            Versioned<Cookie> rmCookie = null;
             try {
-                zkCookie = Cookie.readFromZooKeeper(zk, conf);
+                rmCookie = Cookie.readFromRegistrationManager(rm, conf);
                 // If allowStorageExpansion option is set, we should
                 // make sure that the new set of ledger/index dirs
                 // is a super set of the old; else, we fail the cookie check
-                masterCookie.verifyIsSuperSet(zkCookie.getValue());
-            } catch (KeeperException.NoNodeException nne) {
+                masterCookie.verifyIsSuperSet(rmCookie.getValue());
+            } catch (CookieNotFoundException e) {
                 // can occur in cases:
                 // 1) new environment or
                 // 2) done only metadata format and started bookie server.
@@ -381,7 +377,7 @@ public class Bookie extends BookieCriticalThread {
                 for (File dir : allLedgerDirs) {
                     masterCookie.writeToDirectory(dir);
                 }
-                masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
+                masterCookie.writeToRegistrationManager(rm, conf, rmCookie != null ? rmCookie.getVersion() : Version.NEW);
             }
 
             List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
@@ -389,18 +385,10 @@ public class Bookie extends BookieCriticalThread {
             List<File> indexDirs = indexDirsManager.getAllLedgerDirs();
             checkIfDirsOnSameDiskPartition(indexDirs);
             checkIfDirsOnSameDiskPartition(journalDirectories);
-        } catch (KeeperException ke) {
-            LOG.error("Couldn't access cookie in zookeeper", ke);
-            throw new BookieException.InvalidCookieException(ke);
-        } catch (UnknownHostException uhe) {
-            LOG.error("Couldn't check cookies, networking is broken", uhe);
-            throw new BookieException.InvalidCookieException(uhe);
+
         } catch (IOException ioe) {
             LOG.error("Error accessing cookie on disks", ioe);
             throw new BookieException.InvalidCookieException(ioe);
-        } catch (InterruptedException ie) {
-            LOG.error("Thread interrupted while checking cookies, exiting", ie);
-            throw new BookieException.InvalidCookieException(ie);
         }
     }
 
@@ -450,8 +438,11 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
-    public static void checkEnvironmentWithStorageExpansion(ServerConfiguration conf,
-            ZooKeeper zk, List<File> journalDirectories, List<File> allLedgerDirs) throws BookieException, IOException {
+    public static void checkEnvironmentWithStorageExpansion(
+            ServerConfiguration conf,
+            RegistrationManager rm,
+            List<File> journalDirectories,
+            List<File> allLedgerDirs) throws BookieException {
         try {
             boolean newEnv = false;
             List<File> missedCookieDirs = new ArrayList<File>();
@@ -472,20 +463,20 @@ public class Bookie extends BookieCriticalThread {
                 }
             }
 
-            String instanceId = getInstanceId(conf, zk);
+            String instanceId = rm.getClusterInstanceId();
             Cookie.Builder builder = Cookie.generateCookie(conf);
             if (null != instanceId) {
                 builder.setInstanceId(instanceId);
             }
             Cookie masterCookie = builder.build();
-            Versioned<Cookie> zkCookie = null;
+            Versioned<Cookie> rmCookie = null;
             try {
-                zkCookie = Cookie.readFromZooKeeper(zk, conf);
+                rmCookie = Cookie.readFromRegistrationManager(rm, conf);
                 // If allowStorageExpansion option is set, we should
                 // make sure that the new set of ledger/index dirs
                 // is a super set of the old; else, we fail the cookie check
-                masterCookie.verifyIsSuperSet(zkCookie.getValue());
-            } catch (KeeperException.NoNodeException nne) {
+                masterCookie.verifyIsSuperSet(rmCookie.getValue());
+            } catch (CookieNotFoundException e) {
                 // can occur in cases:
                 // 1) new environment or
                 // 2) done only metadata format and started bookie server.
@@ -550,20 +541,11 @@ public class Bookie extends BookieCriticalThread {
                 for (File dir : allLedgerDirs) {
                     masterCookie.writeToDirectory(dir);
                 }
-                masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
+                masterCookie.writeToRegistrationManager(rm, conf, rmCookie != null ? rmCookie.getVersion() : Version.NEW);
             }
-        } catch (KeeperException ke) {
-            LOG.error("Couldn't access cookie in zookeeper", ke);
-            throw new BookieException.InvalidCookieException(ke);
-        } catch (UnknownHostException uhe) {
-            LOG.error("Couldn't check cookies, networking is broken", uhe);
-            throw new BookieException.InvalidCookieException(uhe);
         } catch (IOException ioe) {
             LOG.error("Error accessing cookie on disks", ioe);
             throw new BookieException.InvalidCookieException(ioe);
-        } catch (InterruptedException ie) {
-            LOG.error("Thread interrupted while checking cookies, exiting", ie);
-            throw new BookieException.InvalidCookieException(ie);
         }
     }
 
@@ -605,25 +587,6 @@ public class Bookie extends BookieCriticalThread {
         return addr;
     }
 
-    private static String getInstanceId(ServerConfiguration conf, ZooKeeper zk) throws KeeperException,
-            InterruptedException {
-        String instanceId = null;
-        if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {
-            LOG.error("BookKeeper metadata doesn't exist in zookeeper. "
-                      + "Has the cluster been initialized? "
-                      + "Try running bin/bookkeeper shell metaformat");
-            throw new KeeperException.NoNodeException("BookKeeper metadata");
-        }
-        try {
-            byte[] data = zk.getData(conf.getZkLedgersRootPath() + "/"
-                    + BookKeeperConstants.INSTANCEID, false, null);
-            instanceId = new String(data, UTF_8);
-        } catch (KeeperException.NoNodeException e) {
-            LOG.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
-        }
-        return instanceId;
-    }
-
     public LedgerDirsManager getLedgerDirsManager() {
         return ledgerDirsManager;
     }
@@ -653,18 +616,14 @@ public class Bookie extends BookieCriticalThread {
     }
 
     public Bookie(ServerConfiguration conf)
-            throws IOException, KeeperException, InterruptedException, BookieException {
+            throws IOException, InterruptedException, BookieException {
         this(conf, NullStatsLogger.INSTANCE);
     }
 
     public Bookie(ServerConfiguration conf, StatsLogger statsLogger)
-            throws IOException, KeeperException, InterruptedException, BookieException {
+            throws IOException, InterruptedException, BookieException {
         super("Bookie-" + conf.getBookiePort());
         this.statsLogger = statsLogger;
-        this.zkAcls = ZkUtils.getACLs(conf);
-        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath() + "/";
-        this.bookieReadonlyRegistrationPath =
-            this.bookieRegistrationPath + BookKeeperConstants.READONLY;
         this.conf = conf;
         this.journalDirectories = Lists.newArrayList();
         for (File journalDirectory : conf.getJournalDirs()) {
@@ -683,9 +642,20 @@ public class Bookie extends BookieCriticalThread {
         }
 
         // instantiate zookeeper client to initialize ledger manager
-        this.zk = instantiateZookeeperClient(conf);
-        checkEnvironment(this.zk);
-        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+        this.registrationManager = instantiateRegistrationManager(conf);
+        checkEnvironment(this.registrationManager);
+        try {
+            ZooKeeper zooKeeper = null;  // ZooKeeper is null existing only for testing
+            if (registrationManager != null) {
+                zooKeeper = ((ZKRegistrationManager) this.registrationManager).getZk();
+            }
+            // current the registration manager is zookeeper only
+            ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(
+                conf,
+                zooKeeper);
+        } catch (KeeperException e) {
+            throw new MetadataStoreException("Failed to initialize ledger manager", e);
+        }
         LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
         ledgerManager = ledgerManagerFactory.newLedgerManager();
 
@@ -721,9 +691,7 @@ public class Bookie extends BookieCriticalThread {
         }
 
         // ZK ephemeral node for this Bookie.
-        String myID = getMyId();
-        zkBookieRegPath = this.bookieRegistrationPath + myID;
-        zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
+        this.bookieId = getMyId();
 
         // instantiate the journals
         journals = Lists.newArrayList();
@@ -760,7 +728,7 @@ public class Bookie extends BookieCriticalThread {
 
             @Override
             public Number getSample() {
-                return zkRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
+                return rmRegistered.get() ? (readOnly.get() ? 0 : 1) : -1;
             }
         });
     }
@@ -932,65 +900,23 @@ public class Bookie extends BookieCriticalThread {
     }
 
     /**
-     * Instantiate the ZooKeeper client for the Bookie.
+     * Instantiate the registration manager for the Bookie.
      */
-    private ZooKeeper instantiateZookeeperClient(ServerConfiguration conf)
-            throws IOException, InterruptedException, KeeperException {
-        if (conf.getZkServers() == null) {
-            LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
-            return null;
-        }
-        // Create the ZooKeeper client instance
-        return newZookeeper(conf);
-    }
-
-    /**
-     * Check existence of <i>regPath</i> and wait it expired if possible.
-     *
-     * @param regPath reg node path.
-     * @return true if regPath exists, otherwise return false
-     * @throws IOException if can't create reg path
-     */
-    protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
-        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
-        Watcher zkPrevRegNodewatcher = new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                // Check for prev znode deletion. Connection expiration is
-                // not handling, since bookie has logic to shutdown.
-                if (EventType.NodeDeleted == event.getType()) {
-                    prevNodeLatch.countDown();
-                }
-            }
-        };
+    private RegistrationManager instantiateRegistrationManager(ServerConfiguration conf) throws BookieException {
+        // Create the registration manager instance
+        Class<? extends RegistrationManager> managerCls;
         try {
-            Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
-            if (null != stat) {
-                // if the ephemeral owner isn't current zookeeper client
-                // wait for it to be expired.
-                if (stat.getEphemeralOwner() != zk.getSessionId()) {
-                    LOG.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
-                            + " {} ms for znode deletion", regPath, conf.getZkTimeout());
-                    // waiting for the previous bookie reg znode deletion
-                    if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
-                        throw new NodeExistsException(regPath);
-                    } else {
-                        return false;
-                    }
-                }
-                return true;
-            } else {
-                return false;
-            }
-        } catch (KeeperException ke) {
-            LOG.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
-            throw new IOException("ZK exception checking and wait ephemeral znode "
-                    + regPath + " expired", ke);
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
-            throw new IOException("Interrupted checking and wait ephemeral znode "
-                    + regPath + " expired", ie);
+            managerCls = conf.getRegistrationManagerClass();
+        } catch (ConfigurationException e) {
+            throw new BookieIllegalOpException(e);
         }
+
+        RegistrationManager manager = ReflectionUtils.newInstance(managerCls);
+        return manager.initialize(conf, () -> {
+            rmRegistered.set(false);
+            // schedule a re-register operation
+            registerBookie(false);
+        }, statsLogger);
     }
 
     /**
@@ -1016,40 +942,28 @@ public class Bookie extends BookieCriticalThread {
     }
 
     protected void doRegisterBookie() throws IOException {
-        doRegisterBookie(readOnly.get() ? zkBookieReadOnlyPath : zkBookieRegPath);
+        doRegisterBookie(readOnly.get());
     }
 
-    private void doRegisterBookie(final String regPath) throws IOException {
-        if (null == zk) {
-            // zookeeper instance is null, means not register itself to zk
+    private void doRegisterBookie(boolean isReadOnly) throws IOException {
+        if (null == registrationManager ||
+            ((ZKRegistrationManager) this.registrationManager).getZk() == null) {
+            // registration manager is null, means not register itself to zk.
+            // ZooKeeper is null existing only for testing.
+            LOG.info("null zk while do register");
             return;
         }
 
-        zkRegistered.set(false);
-
-        // ZK ephemeral node for this Bookie.
+        rmRegistered.set(false);
         try {
-            if (!checkRegNodeAndWaitExpired(regPath)) {
-                // Create the ZK ephemeral node for this Bookie.
-                zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
-                LOG.info("Registered myself in ZooKeeper at {}.", regPath);
-            }
-            zkRegistered.set(true);
-        } catch (KeeperException ke) {
-            LOG.error("ZK exception registering ephemeral Znode for Bookie!", ke);
-            // Throw an IOException back up. This will cause the Bookie
-            // constructor to error out. Alternatively, we could do a System
-            // exit here as this is a fatal error.
-            throw new IOException(ke);
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted exception registering ephemeral Znode for Bookie!", ie);
-            // Throw an IOException back up. This will cause the Bookie
-            // constructor to error out. Alternatively, we could do a System
-            // exit here as this is a fatal error.
-            throw new IOException(ie);
+            registrationManager.registerBookie(bookieId, isReadOnly);
+            rmRegistered.set(true);
+        } catch (BookieException e) {
+            throw new IOException(e);
         }
     }
 
+
     /**
      * Transition the bookie from readOnly mode to writable.
      */
@@ -1073,11 +987,11 @@ public class Bookie extends BookieCriticalThread {
         }
         LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
         // change zookeeper state only when using zookeeper
-        if (null == zk) {
+        if (null == registrationManager) {
             return;
         }
         try {
-            doRegisterBookie(zkBookieRegPath);
+            doRegisterBookie(false);
         } catch (IOException e) {
             LOG.warn("Error in transitioning back to writable mode : ", e);
             transitionToReadOnlyMode();
@@ -1085,12 +999,8 @@ public class Bookie extends BookieCriticalThread {
         }
         // clear the readonly state
         try {
-            zk.delete(zkBookieReadOnlyPath, -1);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted clearing readonly state while transitioning to writable mode : ", e);
-            return;
-        } catch (KeeperException e) {
+            registrationManager.unregisterBookie(bookieId, true);
+        } catch (BookieException e) {
             // if we failed when deleting the readonly flag in zookeeper, it is OK since client would
             // already see the bookie in writable list. so just log the exception
             LOG.warn("Failed to delete bookie readonly state in zookeeper : ", e);
@@ -1130,40 +1040,16 @@ public class Bookie extends BookieCriticalThread {
         LOG.info("Transitioning Bookie to ReadOnly mode,"
                 + " and will serve only read requests from clients!");
         // change zookeeper state only when using zookeeper
-        if (null == zk) {
+        if (null == registrationManager) {
             return;
         }
         try {
-            if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
-                try {
-                    zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
-                              zkAcls, CreateMode.PERSISTENT);
-                } catch (NodeExistsException e) {
-                    // this node is just now created by someone.
-                }
-            }
-            doRegisterBookie(zkBookieReadOnlyPath);
-            try {
-                // Clear the current registered node
-                zk.delete(zkBookieRegPath, -1);
-            } catch (KeeperException.NoNodeException nne) {
-                LOG.warn("No writable bookie registered node {} when transitioning to readonly",
-                        zkBookieRegPath, nne);
-            }
-        } catch (IOException e) {
-            LOG.error("Error in transition to ReadOnly Mode."
-                    + " Shutting down", e);
-            triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
-            return;
-        } catch (KeeperException e) {
+            registrationManager.registerBookie(bookieId, true);
+        } catch (BookieException e) {
             LOG.error("Error in transition to ReadOnly Mode."
                     + " Shutting down", e);
             triggerBookieShutdown(ExitCode.BOOKIE_EXCEPTION);
             return;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted Exception while transitioning to ReadOnly Mode.");
-            return;
         }
     }
 
@@ -1174,54 +1060,6 @@ public class Bookie extends BookieCriticalThread {
         return readOnly.get();
     }
 
-    /**
-     * Create a new zookeeper client to zk cluster.
-     *
-     * <p>
-     * Bookie Server just used zk client when syncing ledgers for garbage collection.
-     * So when zk client is expired, it means this bookie server is not available in
-     * bookie server list. The bookie client will be notified for its expiration. No
-     * more bookie request will be sent to this server. So it's better to exit when zk
-     * expired.
-     * </p>
-     * <p>
-     * Since there are lots of bk operations cached in queue, so we wait for all the operations
-     * are processed and quit. It is done by calling <b>shutdown</b>.
-     * </p>
-     *
-     * @param conf server configuration
-     *
-     * @return zk client instance
-     */
-    private ZooKeeper newZookeeper(final ServerConfiguration conf)
-            throws IOException, InterruptedException, KeeperException {
-        Set<Watcher> watchers = new HashSet<Watcher>();
-        watchers.add(new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                if (!running) {
-                    // do nothing until first registration
-                    return;
-                }
-                // Check for expired connection.
-                if (event.getType().equals(EventType.None) && event.getState().equals(KeeperState.Expired)) {
-                    zkRegistered.set(false);
-                    // schedule a re-register operation
-                    registerBookie(false);
-                }
-            }
-        });
-        return ZooKeeperClient.newBuilder()
-                .connectString(conf.getZkServers())
-                .sessionTimeoutMs(conf.getZkTimeout())
-                .watchers(watchers)
-                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
-                        conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
-                .requestRateLimit(conf.getZkRequestRateLimit())
-                .statsLogger(this.statsLogger.scope(BOOKIE_SCOPE))
-                .build();
-    }
-
     public boolean isRunning() {
         return running;
     }
@@ -1327,8 +1165,8 @@ public class Bookie extends BookieCriticalThread {
                 stateService.shutdown();
             }
             // Shutdown the ZK client
-            if (zk != null) {
-                zk.close();
+            if (registrationManager != null) {
+                registrationManager.close();
             }
         } catch (InterruptedException ie) {
             LOG.error("Interrupted during shutting down bookie : ", ie);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index 57bf708..99ae39e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -40,6 +40,10 @@ public abstract class BookieException extends Exception {
         super(reason);
     }
 
+    public BookieException(int code, String reason, Throwable t) {
+        super(reason, t);
+    }
+
     public static BookieException create(int code) {
         switch(code) {
         case Code.UnauthorizedAccessException:
@@ -52,6 +56,12 @@ public abstract class BookieException extends Exception {
             return new UpgradeException();
         case Code.DiskPartitionDuplicationException:
             return new DiskPartitionDuplicationException();
+        case Code.CookieNotFoundException:
+            return new CookieNotFoundException();
+        case Code.MetadataStoreException:
+            return new MetadataStoreException();
+        case Code.UnknownBookieIdException:
+            return new UnknownBookieIdException();
         default:
             return new BookieIllegalOpException();
         }
@@ -66,10 +76,12 @@ public abstract class BookieException extends Exception {
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
-
         int InvalidCookieException = -102;
         int UpgradeException = -103;
         int DiskPartitionDuplicationException = -104;
+        int CookieNotFoundException = -105;
+        int MetadataStoreException = -106;
+        int UnknownBookieIdException = -107;
     }
 
     public void setCode(int code) {
@@ -101,6 +113,15 @@ public abstract class BookieException extends Exception {
         case Code.DiskPartitionDuplicationException:
             err = "Disk Partition Duplication is not allowed";
             break;
+        case Code.CookieNotFoundException:
+            err = "Cookie not found";
+            break;
+        case Code.MetadataStoreException:
+            err = "Error performing metadata operations";
+            break;
+        case Code.UnknownBookieIdException:
+            err = "Unknown bookie id";
+            break;
         default:
             err = "Invalid operation";
             break;
@@ -132,7 +153,15 @@ public abstract class BookieException extends Exception {
      */
     public static class BookieIllegalOpException extends BookieException {
         public BookieIllegalOpException() {
-            super(Code.UnauthorizedAccessException);
+            super(Code.IllegalOpException);
+        }
+
+        public BookieIllegalOpException(String reason) {
+            super(Code.IllegalOpException, reason);
+        }
+
+        public BookieIllegalOpException(Throwable cause) {
+            super(Code.IllegalOpException, cause);
         }
     }
 
@@ -165,6 +194,23 @@ public abstract class BookieException extends Exception {
     }
 
     /**
+     * Signal that no cookie is found when starting a bookie.
+     */
+    public static class CookieNotFoundException extends BookieException {
+        public CookieNotFoundException() {
+            this("");
+        }
+
+        public CookieNotFoundException(String reason) {
+            super(Code.CookieNotFoundException, reason);
+        }
+
+        public CookieNotFoundException(Throwable cause) {
+            super(Code.CookieNotFoundException, cause);
+        }
+    }
+
+    /**
      * Signals that an exception occurs on upgrading a bookie.
      */
     public static class UpgradeException extends BookieException {
@@ -197,4 +243,40 @@ public abstract class BookieException extends Exception {
             super(Code.DiskPartitionDuplicationException, reason);
         }
     }
+
+    /**
+     * Signal when bookie has problems on accessing metadata store.
+     */
+    public static class MetadataStoreException extends BookieException {
+
+        public MetadataStoreException() {
+            this("");
+        }
+
+        public MetadataStoreException(String reason) {
+            super(Code.MetadataStoreException, reason);
+        }
+
+        public MetadataStoreException(Throwable cause) {
+            super(Code.MetadataStoreException, cause);
+        }
+
+        public MetadataStoreException(String reason, Throwable cause) {
+            super(Code.MetadataStoreException, reason, cause);
+        }
+    }
+
+    /**
+     * Signal when bookie has problems on accessing metadata store.
+     */
+    public static class UnknownBookieIdException extends BookieException {
+
+        public UnknownBookieIdException() {
+            super(Code.UnknownBookieIdException);
+        }
+
+        public UnknownBookieIdException(Throwable cause) {
+            super(Code.UnknownBookieIdException, cause);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 1c08eb2..481c647 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -61,6 +62,8 @@ import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.client.UpdateLedgerOp;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -69,6 +72,7 @@ import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.IOUtils;
@@ -88,7 +92,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -261,18 +264,15 @@ public class BookieShell implements Tool {
             boolean result = Bookie.format(conf, interactive, force);
             // delete cookie
             if (cmdLine.hasOption("d")) {
-                ZooKeeperClient zkc =
-                        ZooKeeperClient.newBuilder()
-                                .connectString(conf.getZkServers())
-                                .sessionTimeoutMs(conf.getZkTimeout())
-                                .build();
+                RegistrationManager rm = new ZKRegistrationManager();
+                rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
                 try {
-                    Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zkc, conf);
-                    cookie.getValue().deleteFromZooKeeper(zkc, conf, cookie.getVersion());
-                } catch (KeeperException.NoNodeException nne) {
+                    Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+                    cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+                } catch (CookieNotFoundException nne) {
                     LOG.warn("No cookie to remove : ", nne);
                 } finally {
-                    zkc.close();
+                    rm.close();
                 }
             }
             return (result) ? 0 : 1;
@@ -324,7 +324,7 @@ public class BookieShell implements Tool {
 
         private int bkRecovery(ClientConfiguration conf, BookKeeperAdmin bkAdmin,
                                String[] args, boolean deleteCookie)
-                throws InterruptedException, BKException, KeeperException, IOException {
+                throws InterruptedException, BKException, BookieException, IOException {
             final String bookieSrcString[] = args[0].split(":");
             if (bookieSrcString.length != 2) {
                 System.err.println("BookieSrc inputted has invalid format"
@@ -347,10 +347,14 @@ public class BookieShell implements Tool {
 
             bkAdmin.recoverBookieData(bookieSrc, bookieDest);
             if (deleteCookie) {
+                ServerConfiguration serverConf = new ServerConfiguration();
+                serverConf.addConfiguration(conf);
+                RegistrationManager rm = new ZKRegistrationManager();
                 try {
-                    Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc);
-                    cookie.getValue().deleteFromZooKeeper(bkAdmin.getZooKeeper(), conf, bookieSrc, cookie.getVersion());
-                } catch (KeeperException.NoNodeException nne) {
+                    rm.initialize(serverConf, () -> {}, NullStatsLogger.INSTANCE);
+                    Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
+                    cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
+                } catch (CookieNotFoundException nne) {
                     LOG.warn("No cookie to remove for {} : ", bookieSrc, nne);
                 }
             }
@@ -1506,22 +1510,19 @@ public class BookieShell implements Tool {
             return updateBookieIdInCookie(bookieId, useHostName);
         }
 
-        private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws IOException,
+        private int updateBookieIdInCookie(final String bookieId, final boolean useHostname) throws BookieException,
                 InterruptedException {
-            ZooKeeper zk = null;
+            RegistrationManager rm = new ZKRegistrationManager();
             try {
-                zk = ZooKeeperClient.newBuilder()
-                        .connectString(bkConf.getZkServers())
-                        .sessionTimeoutMs(bkConf.getZkTimeout())
-                        .build();
+                rm.initialize(bkConf, () -> {}, NullStatsLogger.INSTANCE);
                 ServerConfiguration conf = new ServerConfiguration(bkConf);
                 String newBookieId = Bookie.getBookieAddress(conf).toString();
                 // read oldcookie
                 Versioned<Cookie> oldCookie = null;
                 try {
                     conf.setUseHostNameAsBookieID(!useHostname);
-                    oldCookie = Cookie.readFromZooKeeper(zk, conf);
-                } catch (KeeperException.NoNodeException nne) {
+                    oldCookie = Cookie.readFromRegistrationManager(rm, conf);
+                } catch (CookieNotFoundException nne) {
                     LOG.error("Either cookie already updated with UseHostNameAsBookieID={} or no cookie exists!",
                             useHostname, nne);
                     return -1;
@@ -1540,12 +1541,12 @@ public class BookieShell implements Tool {
                 if (hasCookieUpdatedInDirs) {
                     try {
                         conf.setUseHostNameAsBookieID(useHostname);
-                        Cookie.readFromZooKeeper(zk, conf);
+                        Cookie.readFromRegistrationManager(rm, conf);
                         // since newcookie exists, just do cleanup of oldcookie and return
                         conf.setUseHostNameAsBookieID(!useHostname);
-                        oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
+                        oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
                         return 0;
-                    } catch (KeeperException.NoNodeException nne) {
+                    } catch (CookieNotFoundException nne) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Ignoring, cookie will be written to zookeeper");
                         }
@@ -1569,20 +1570,17 @@ public class BookieShell implements Tool {
                 }
                 // writes newcookie to zookeeper
                 conf.setUseHostNameAsBookieID(useHostname);
-                newCookie.writeToZooKeeper(zk, conf, Version.NEW);
+                newCookie.writeToRegistrationManager(rm, conf, Version.NEW);
 
                 // delete oldcookie
                 conf.setUseHostNameAsBookieID(!useHostname);
-                oldCookie.getValue().deleteFromZooKeeper(zk, conf, oldCookie.getVersion());
-            } catch (KeeperException ke) {
-                LOG.error("KeeperException during cookie updation!", ke);
-                return -1;
+                oldCookie.getValue().deleteFromRegistrationManager(rm, conf, oldCookie.getVersion());
             } catch (IOException ioe) {
                 LOG.error("IOException during cookie updation!", ioe);
                 return -1;
             } finally {
-                if (zk != null) {
-                    zk.close();
+                if (rm != null) {
+                    rm.close();
                 }
             }
             return 0;
@@ -1628,31 +1626,33 @@ public class BookieShell implements Tool {
         @Override
         int runCmd(CommandLine cmdLine) {
             ServerConfiguration conf = new ServerConfiguration(bkConf);
-            ZooKeeper zk;
+            RegistrationManager rm = new ZKRegistrationManager();
             try {
-                zk = ZooKeeperClient.newBuilder()
-                        .connectString(bkConf.getZkServers())
-                        .sessionTimeoutMs(bkConf.getZkTimeout()).build();
-            } catch (KeeperException | InterruptedException | IOException e) {
-                LOG.error("Exception while establishing zookeeper connection.", e);
-                return -1;
-            }
+                try {
+                    rm.initialize(bkConf, () -> {}, NullStatsLogger.INSTANCE);
+                } catch (BookieException e) {
+                    LOG.error("Exception while establishing zookeeper connection.", e);
+                    return -1;
+                }
 
-            List<File> allLedgerDirs = Lists.newArrayList();
-            allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
-            if (indexDirectories != ledgerDirectories) {
-                allLedgerDirs.addAll(Arrays.asList(indexDirectories));
-            }
+                List<File> allLedgerDirs = Lists.newArrayList();
+                allLedgerDirs.addAll(Arrays.asList(ledgerDirectories));
+                if (indexDirectories != ledgerDirectories) {
+                    allLedgerDirs.addAll(Arrays.asList(indexDirectories));
+                }
 
-            try {
-                Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+                try {
+                    Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
                         Lists.newArrayList(journalDirectories), allLedgerDirs);
-            } catch (BookieException | IOException e) {
-                LOG.error(
+                } catch (BookieException e) {
+                    LOG.error(
                         "Exception while updating cookie for storage expansion", e);
-                return -1;
+                    return -1;
+                }
+                return 0;
+            } finally {
+                rm.close();
             }
-            return 0;
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
index ec7793c..e2d4f12 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java
@@ -37,28 +37,23 @@ import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.List;
 import java.util.Set;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
+import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.CookieFormat;
 import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * When a bookie starts for the first time it generates  a cookie, and stores
- * the cookie in zookeeper as well as in the each of the local filesystem
+ * the cookie in registration manager as well as in the each of the local filesystem
  * directories it uses. This cookie is used to ensure that for the life of the
  * bookie, its configuration stays the same. If any of the bookie directories
  * becomes unavailable, the bookie becomes unavailable. If the bookie changes
@@ -233,79 +228,61 @@ public class Cookie {
     }
 
     /**
-     * Writes cookie details to ZooKeeper.
+     * Writes cookie details to registration manager.
      *
-     * @param zk ZooKeeper instance
+     * @param rm registration manager
      * @param conf configuration
      * @param version version
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws UnknownHostException
+     * @throws BookieException when fail to write the cookie.
      */
-    public void writeToZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version)
-            throws KeeperException, InterruptedException, UnknownHostException {
-        List<ACL> zkAcls = ZkUtils.getACLs(conf);
-        String bookieCookiePath = conf.getZkLedgersRootPath() + "/"
-                + BookKeeperConstants.COOKIE_NODE;
-        String zkPath = getZkPath(conf);
-        byte[] data = toString().getBytes(UTF_8);
-        if (Version.NEW == version) {
-            if (zk.exists(bookieCookiePath, false) == null) {
-                try {
-                    zk.create(bookieCookiePath, new byte[0],
-                            zkAcls, CreateMode.PERSISTENT);
-                } catch (KeeperException.NodeExistsException nne) {
-                    LOG.info("More than one bookie tried to create {} at once. Safe to ignore",
-                            bookieCookiePath);
-                }
-            }
-            zk.create(zkPath, data,
-                    zkAcls, CreateMode.PERSISTENT);
-        } else {
-            if (!(version instanceof LongVersion)) {
-                throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
-            }
-            zk.setData(zkPath, data, (int) ((LongVersion) version).getLongVersion());
+    public void writeToRegistrationManager(RegistrationManager rm, ServerConfiguration conf, Version version)
+            throws BookieException {
+        BookieSocketAddress address = null;
+        try {
+            address = Bookie.getBookieAddress(conf);
+        } catch (UnknownHostException e) {
+            throw new UnknownBookieIdException(e);
         }
+        byte[] data = toString().getBytes(UTF_8);
+        rm.writeCookie(address.toString(), new Versioned<>(data, version));
     }
 
     /**
-     * Deletes cookie from ZooKeeper and sets znode version to DEFAULT_COOKIE_ZNODE_VERSION.
+     * Deletes cookie from registration manager.
      *
-     * @param zk ZooKeeper instance
+     * @param rm registration manager
      * @param conf configuration
-     * @param version zookeeper version
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws UnknownHostException
+     * @param version cookie version
+     * @throws BookieException when fail to delete cookie.
      */
-    public void deleteFromZooKeeper(ZooKeeper zk, ServerConfiguration conf, Version version) throws KeeperException,
-            InterruptedException, UnknownHostException {
-        BookieSocketAddress address = Bookie.getBookieAddress(conf);
-        deleteFromZooKeeper(zk, conf, address, version);
+    public void deleteFromRegistrationManager(RegistrationManager rm,
+                                              ServerConfiguration conf,
+                                              Version version) throws BookieException {
+        BookieSocketAddress address = null;
+        try {
+            address = Bookie.getBookieAddress(conf);
+        } catch (UnknownHostException e) {
+            throw new UnknownBookieIdException(e);
+        }
+        deleteFromRegistrationManager(rm, address, version);
     }
 
     /**
-     * Delete cookie from zookeeper.
+     * Delete cookie from registration manager.
      *
-     * @param zk zookeeper client
-     * @param conf configuration instance
+     * @param rm registration manager
      * @param address bookie address
      * @param version cookie version
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws UnknownHostException
+     * @throws BookieException when fail to delete cookie.
      */
-    public void deleteFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf,
-                                    BookieSocketAddress address, Version version)
-            throws KeeperException, InterruptedException, UnknownHostException {
+    public void deleteFromRegistrationManager(RegistrationManager rm,
+                                              BookieSocketAddress address,
+                                              Version version) throws BookieException {
         if (!(version instanceof LongVersion)) {
             throw new IllegalArgumentException("Invalid version type, expected ZkVersion type");
         }
 
-        String zkPath = getZkPath(conf, address);
-        zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
-        LOG.info("Removed cookie from {} for bookie {}.", conf.getZkLedgersRootPath(), address);
+        rm.removeCookie(address.toString(), version);
     }
 
     /**
@@ -326,48 +303,44 @@ public class Cookie {
     }
 
     /**
-     * Read cookie from ZooKeeper.
+     * Read cookie from registration manager.
      *
-     * @param zk ZooKeeper instance
+     * @param rm registration manager
      * @param conf configuration
      * @return versioned cookie object
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws IOException
-     * @throws UnknownHostException
+     * @throws BookieException when fail to read cookie
      */
-    public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, ServerConfiguration conf)
-            throws KeeperException, InterruptedException, IOException, UnknownHostException {
-        return readFromZooKeeper(zk, conf, Bookie.getBookieAddress(conf));
+    public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm, ServerConfiguration conf)
+            throws BookieException {
+        try {
+            return readFromRegistrationManager(rm, Bookie.getBookieAddress(conf));
+        } catch (UnknownHostException e) {
+            throw new UnknownBookieIdException(e);
+        }
     }
 
     /**
-     * Read cookie from zookeeper for a given bookie <i>address</i>.
+     * Read cookie from registration manager for a given bookie <i>address</i>.
      *
-     * @param zk zookeeper client
-     * @param conf configuration instance
+     * @param rm registration manager
      * @param address bookie address
      * @return versioned cookie object
-     * @throws KeeperException
-     * @throws InterruptedException
-     * @throws IOException
-     * @throws UnknownHostException
+     * @throws BookieException when fail to read cookie
      */
-    public static Versioned<Cookie> readFromZooKeeper(ZooKeeper zk, AbstractConfiguration conf, BookieSocketAddress address)
-            throws KeeperException, InterruptedException, IOException, UnknownHostException {
-        String zkPath = getZkPath(conf, address);
-
-        Stat stat = zk.exists(zkPath, false);
-        byte[] data = zk.getData(zkPath, false, stat);
-        BufferedReader reader = new BufferedReader(new StringReader(new String(data, UTF_8)));
+    public static Versioned<Cookie> readFromRegistrationManager(RegistrationManager rm,
+                                                         BookieSocketAddress address) throws BookieException {
+        Versioned<byte[]> cookieData = rm.readCookie(address.toString());
+        BufferedReader reader = new BufferedReader(new StringReader(new String(cookieData.getValue(), UTF_8)));
         try {
-            Builder builder = parse(reader);
-            Cookie cookie = builder.build();
-            // sets stat version from ZooKeeper
-            LongVersion version = new LongVersion(stat.getVersion());
-            return new Versioned<Cookie>(cookie, version);
-        } finally {
-            reader.close();
+            try {
+                Builder builder = parse(reader);
+                Cookie cookie = builder.build();
+                return new Versioned<Cookie>(cookie, cookieData.getVersion());
+            } finally {
+                reader.close();
+            }
+        } catch (IOException ioe) {
+            throw new InvalidCookieException(ioe);
         }
     }
 
@@ -390,30 +363,6 @@ public class Cookie {
     }
 
     /**
-     * Returns cookie path in zookeeper.
-     *
-     * @param conf configuration
-     * @return cookie zk path
-     * @throws UnknownHostException
-     */
-    static String getZkPath(ServerConfiguration conf)
-            throws UnknownHostException {
-        return getZkPath(conf, Bookie.getBookieAddress(conf));
-    }
-
-    /**
-     * Return cookie path for a given bookie <i>address</i>.
-     *
-     * @param conf configuration
-     * @param address bookie address
-     * @return cookie path for bookie
-     */
-    static String getZkPath(AbstractConfiguration conf, BookieSocketAddress address) {
-        String bookieCookiePath = conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.COOKIE_NODE;
-        return bookieCookiePath + "/" + address;
-    }
-
-    /**
      * Check whether the 'bookieHost' was created using a hostname or an IP
      * address. Represent as 'hostname/IPaddress' if the InetSocketAddress was
      * created using hostname. Represent as '/IPaddress' if the
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
index 045f8b2..4158abd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileSystemUpgrade.java
@@ -36,20 +36,19 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Scanner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.HardLink;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,22 +127,15 @@ public class FileSystemUpgrade {
         }
     }
 
-    private static ZooKeeper newZookeeper(final ServerConfiguration conf)
+    private static RegistrationManager newRegistrationManager(final ServerConfiguration conf)
             throws BookieException.UpgradeException {
+
         try {
-            int zkTimeout = conf.getZkTimeout();
-            return ZooKeeperClient.newBuilder()
-                    .connectString(conf.getZkServers())
-                    .sessionTimeoutMs(zkTimeout)
-                    .operationRetryPolicy(
-                            new BoundExponentialBackoffRetryPolicy(zkTimeout, zkTimeout, Integer.MAX_VALUE))
-                    .build();
-        } catch (InterruptedException ie) {
-            throw new BookieException.UpgradeException(ie);
-        } catch (IOException ioe) {
-            throw new BookieException.UpgradeException(ioe);
-        } catch (KeeperException ke) {
-            throw new BookieException.UpgradeException(ke);
+            Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
+            RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
+            return rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        } catch (Exception e) {
+            throw new BookieException.UpgradeException(e);
         }
     }
 
@@ -177,7 +169,7 @@ public class FileSystemUpgrade {
             throws BookieException.UpgradeException, InterruptedException {
         LOG.info("Upgrading...");
 
-        ZooKeeper zk = newZookeeper(conf);
+        RegistrationManager rm = newRegistrationManager(conf);
         try {
             Map<File, File> deferredMoves = new HashMap<File, File>();
             Cookie.Builder cookieBuilder = Cookie.generateCookie(conf);
@@ -229,15 +221,15 @@ public class FileSystemUpgrade {
             }
 
             try {
-                c.writeToZooKeeper(zk, conf, Version.NEW);
-            } catch (KeeperException ke) {
-                LOG.error("Error writing cookie to zookeeper");
+                c.writeToRegistrationManager(rm, conf, Version.NEW);
+            } catch (BookieException ke) {
+                LOG.error("Error writing cookie to registration manager");
                 throw new BookieException.UpgradeException(ke);
             }
         } catch (IOException ioe) {
             throw new BookieException.UpgradeException(ioe);
         } finally {
-            zk.close();
+            rm.close();
         }
         LOG.info("Done");
     }
@@ -283,7 +275,7 @@ public class FileSystemUpgrade {
     public static void rollback(ServerConfiguration conf)
             throws BookieException.UpgradeException, InterruptedException {
         LOG.info("Rolling back upgrade...");
-        ZooKeeper zk = newZookeeper(conf);
+        RegistrationManager rm = newRegistrationManager(conf);
         try {
             for (File d : getAllDirectories(conf)) {
                 LOG.info("Rolling back {}", d);
@@ -305,17 +297,14 @@ public class FileSystemUpgrade {
                 }
             }
             try {
-                Versioned<Cookie> cookie = Cookie.readFromZooKeeper(zk, conf);
-                cookie.getValue().deleteFromZooKeeper(zk, conf, cookie.getVersion());
-            } catch (KeeperException ke) {
-                LOG.error("Error deleting cookie from ZooKeeper");
+                Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, conf);
+                cookie.getValue().deleteFromRegistrationManager(rm, conf, cookie.getVersion());
+            } catch (BookieException ke) {
+                LOG.error("Error deleting cookie from Registration Manager");
                 throw new BookieException.UpgradeException(ke);
-            } catch (IOException ioe) {
-                LOG.error("I/O Error deleting cookie from ZooKeeper");
-                throw new BookieException.UpgradeException(ioe);
             }
         } finally {
-            zk.close();
+            rm.close();
         }
         LOG.info("Done");
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index aae3e7d..10fb329 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -26,6 +26,8 @@ import com.google.common.annotations.Beta;
 import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
 import org.apache.bookkeeper.bookie.LedgerStorage;
 import org.apache.bookkeeper.bookie.SortedLedgerStorage;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
@@ -173,6 +175,9 @@ public class ServerConfiguration extends AbstractConfiguration {
     // Lifecycle Components
     protected final static String EXTRA_SERVER_COMPONENTS = "extraServerComponents";
 
+    // Registration
+    protected final static String REGISTRATION_MANAGER_CLASS = "registrationManagerClass";
+
     /**
      * Construct a default configuration object
      */
@@ -2384,4 +2389,27 @@ public class ServerConfiguration extends AbstractConfiguration {
         return this;
     }
 
+    /**
+     * Set registration manager class
+     *
+     * @param regManagerClass
+     *            ManagerClass
+     */
+    public void setRegistrationManagerClass(
+            Class<? extends RegistrationManager> regManagerClass) {
+        setProperty(REGISTRATION_MANAGER_CLASS, regManagerClass);
+    }
+
+    /**
+     * Get Registration Manager Class.
+     *
+     * @return registration manager class.
+     */
+    public Class<? extends RegistrationManager> getRegistrationManagerClass()
+            throws ConfigurationException {
+        return ReflectionUtils.getClass(this, REGISTRATION_MANAGER_CLASS,
+                ZKRegistrationManager.class, RegistrationManager.class,
+                defaultLoader);
+    }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
new file mode 100644
index 0000000..7f6160a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A registration client, which the bookkeeper client will use to interact with registration service.
+ */
+public interface RegistrationClient {
+
+    /**
+     * Get the list of available bookie identifiers.
+     *
+     * @return a future represents the list of available bookies
+     */
+    CompletableFuture<List<String>> getAvailableBookies();
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
new file mode 100644
index 0000000..2b274d7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/RegistrationManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
+import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+
+/**
+ * Registration manager interface, which a bookie server will use to do the registration process.
+ */
+@LimitedPrivate
+@Evolving
+public interface RegistrationManager extends AutoCloseable {
+
+    /**
+     * Registration Listener on listening the registration state.
+     */
+    @FunctionalInterface
+    interface RegistrationListener {
+
+        /**
+         * Signal when registration is expired.
+         */
+        void onRegistrationExpired();
+
+    }
+
+    RegistrationManager initialize(ServerConfiguration conf,
+                                   RegistrationListener listener,
+                                   StatsLogger statsLogger) throws BookieException;
+
+    @Override
+    void close();
+
+    /**
+     * Return the cluster instance id.
+     *
+     * @return the cluster instance id.
+     */
+    String getClusterInstanceId() throws BookieException;
+
+    /**
+     * Registering the bookie server as <i>bookieId</i>.
+     *
+     * @param bookieId bookie id
+     * @param readOnly whether to register it as writable or readonly
+     * @throws BookieException when fail to register a bookie.
+     */
+    void registerBookie(String bookieId, boolean readOnly) throws BookieException;
+
+    /**
+     * Unregistering the bookie server as <i>bookieId</i>.
+     *
+     * @param bookieId bookie id
+     * @param readOnly whether to register it as writable or readonly
+     * @throws BookieException when fail to unregister a bookie.
+     */
+    void unregisterBookie(String bookieId, boolean readOnly) throws BookieException;
+
+    /**
+     * Write the cookie data, which will be used for verifying the integrity of the bookie environment.
+     *
+     * @param bookieId bookie id
+     * @param cookieData cookie data
+     * @throws BookieException when fail to write cookie
+     */
+    void writeCookie(String bookieId, Versioned<byte[]> cookieData) throws BookieException;
+
+    /**
+     * Read the cookie data, which will be used for verifying the integrity of the bookie environment.
+     *
+     * @param bookieId bookie id
+     * @return versioned cookie data
+     * @throws BookieException when fail to read cookie
+     */
+    Versioned<byte[]> readCookie(String bookieId) throws BookieException;
+
+    /**
+     * Remove the cookie data.
+     *
+     * @param bookieId bookie id
+     * @param version version of the cookie data
+     * @throws BookieException when fail to remove cookie
+     */
+    void removeCookie(String bookieId, Version version) throws BookieException;
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
new file mode 100644
index 0000000..dbe924d
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationManager.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.discover;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
+import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
+import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
+import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * ZooKeeper Based {@link RegistrationManager}.
+ */
+@Slf4j
+public class ZKRegistrationManager implements RegistrationManager {
+
+    private ServerConfiguration conf;
+    private ZooKeeper zk;
+    private List<ACL> zkAcls;
+    private volatile boolean running = false;
+
+    // cookie path
+    private String cookiePath;
+    // registration paths
+    protected String bookieRegistrationPath;
+    protected String bookieReadonlyRegistrationPath;
+
+    private StatsLogger statsLogger;
+
+    @Override
+    public RegistrationManager initialize(ServerConfiguration conf,
+                                          RegistrationListener listener,
+                                          StatsLogger statsLogger)
+            throws BookieException {
+        if (null == conf.getZkServers()) {
+            log.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
+            return null;
+        }
+
+        this.conf = conf;
+        this.zkAcls = ZkUtils.getACLs(conf);
+        this.statsLogger = statsLogger;
+
+        this.cookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
+        this.bookieRegistrationPath = conf.getZkAvailableBookiesPath();
+        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
+
+        try {
+            this.zk = newZookeeper(conf, listener);
+        } catch (InterruptedException | KeeperException | IOException e) {
+            throw new MetadataStoreException(e);
+        }
+        return this;
+    }
+
+    @VisibleForTesting
+    public void setZk(ZooKeeper zk) {
+        this.zk = zk;
+    }
+
+    @VisibleForTesting
+    public ZooKeeper getZk() {
+        return this.zk;
+    }
+
+    /**
+     * Create a new zookeeper client to zk cluster.
+     *
+     * <p>
+     * Bookie Server just used zk client when syncing ledgers for garbage collection.
+     * So when zk client is expired, it means this bookie server is not available in
+     * bookie server list. The bookie client will be notified for its expiration. No
+     * more bookie request will be sent to this server. So it's better to exit when zk
+     * expired.
+     * </p>
+     * <p>
+     * Since there are lots of bk operations cached in queue, so we wait for all the operations
+     * are processed and quit. It is done by calling <b>shutdown</b>.
+     * </p>
+     *
+     * @param conf server configuration
+     *
+     * @return zk client instance
+     */
+    private ZooKeeper newZookeeper(final ServerConfiguration conf, RegistrationListener listener)
+        throws InterruptedException, KeeperException, IOException {
+        Set<Watcher> watchers = new HashSet<Watcher>();
+        watchers.add(event -> {
+            if (!running) {
+                // do nothing until first registration
+                return;
+            }
+            // Check for expired connection.
+            if (event.getType().equals(EventType.None) && event.getState().equals(KeeperState.Expired)) {
+                listener.onRegistrationExpired();
+            }
+        });
+        return ZooKeeperClient.newBuilder()
+                .connectString(conf.getZkServers())
+                .sessionTimeoutMs(conf.getZkTimeout())
+                .watchers(watchers)
+                .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(conf.getZkRetryBackoffStartMs(),
+                        conf.getZkRetryBackoffMaxMs(), Integer.MAX_VALUE))
+                .requestRateLimit(conf.getZkRequestRateLimit())
+                .statsLogger(this.statsLogger.scope(BOOKIE_SCOPE))
+                .build();
+    }
+
+    @Override
+    public void close() {
+        if (null != zk) {
+            try {
+                zk.close();
+            } catch (InterruptedException e) {
+                log.warn("Interrupted on closing zookeeper client", e);
+            }
+        }
+    }
+
+    private String getCookiePath(String bookieId) {
+        return this.cookiePath + "/" + bookieId;
+    }
+
+    //
+    // Registration Management
+    //
+
+    /**
+     * Check existence of <i>regPath</i> and wait it expired if possible.
+     *
+     * @param regPath reg node path.
+     * @return true if regPath exists, otherwise return false
+     * @throws IOException if can't create reg path
+     */
+    protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
+        final CountDownLatch prevNodeLatch = new CountDownLatch(1);
+        Watcher zkPrevRegNodewatcher = new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                // Check for prev znode deletion. Connection expiration is
+                // not handling, since bookie has logic to shutdown.
+                if (EventType.NodeDeleted == event.getType()) {
+                    prevNodeLatch.countDown();
+                }
+            }
+        };
+        try {
+            Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
+            if (null != stat) {
+                // if the ephemeral owner isn't current zookeeper client
+                // wait for it to be expired.
+                if (stat.getEphemeralOwner() != zk.getSessionId()) {
+                    log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+                            + " {} ms for znode deletion", regPath, conf.getZkTimeout());
+                    // waiting for the previous bookie reg znode deletion
+                    if (!prevNodeLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS)) {
+                        throw new NodeExistsException(regPath);
+                    } else {
+                        return false;
+                    }
+                }
+                return true;
+            } else {
+                return false;
+            }
+        } catch (KeeperException ke) {
+            log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
+            throw new IOException("ZK exception checking and wait ephemeral znode "
+                    + regPath + " expired", ke);
+        } catch (InterruptedException ie) {
+            log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
+            throw new IOException("Interrupted checking and wait ephemeral znode "
+                    + regPath + " expired", ie);
+        }
+    }
+
+    @Override
+    public void registerBookie(String bookieId, boolean readOnly) throws BookieException {
+        if (!readOnly) {
+            String regPath = bookieRegistrationPath + "/" + bookieId;
+            doRegisterBookie(regPath);
+        } else {
+            doRegisterReadOnlyBookie(bookieId);
+        }
+    }
+
+    private void doRegisterBookie(String regPath) throws BookieException {
+        // ZK ephemeral node for this Bookie.
+        try {
+            if (!checkRegNodeAndWaitExpired(regPath)) {
+                // Create the ZK ephemeral node for this Bookie.
+                zk.create(regPath, new byte[0], zkAcls, CreateMode.EPHEMERAL);
+            }
+        } catch (KeeperException ke) {
+            log.error("ZK exception registering ephemeral Znode for Bookie!", ke);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new MetadataStoreException(ke);
+        } catch (InterruptedException ie) {
+            log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie);
+            // Throw an IOException back up. This will cause the Bookie
+            // constructor to error out. Alternatively, we could do a System
+            // exit here as this is a fatal error.
+            throw new MetadataStoreException(ie);
+        } catch (IOException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    private void doRegisterReadOnlyBookie(String bookieId) throws BookieException {
+        try {
+            if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
+                try {
+                    zk.create(this.bookieReadonlyRegistrationPath, new byte[0],
+                              zkAcls, CreateMode.PERSISTENT);
+                } catch (NodeExistsException e) {
+                    // this node is just now created by someone.
+                }
+            }
+
+            String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+            doRegisterBookie(regPath);
+            // clear the write state
+            regPath = bookieRegistrationPath + "/" + bookieId;
+            try {
+                // Clear the current registered node
+                zk.delete(regPath, -1);
+            } catch (KeeperException.NoNodeException nne) {
+                log.warn("No writable bookie registered node {} when transitioning to readonly",
+                    regPath, nne);
+            }
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    @Override
+    public void unregisterBookie(String bookieId, boolean readOnly) throws BookieException {
+        String regPath;
+        if (!readOnly) {
+            regPath = bookieRegistrationPath + "/" + bookieId;
+        } else {
+            regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
+        }
+        doUnregisterBookie(regPath);
+    }
+
+    private void doUnregisterBookie(String regPath) throws BookieException {
+        try {
+            zk.delete(regPath, -1);
+        } catch (InterruptedException | KeeperException e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    //
+    // Cookie Management
+    //
+
+    @Override
+    public void writeCookie(String bookieId,
+                            Versioned<byte[]> cookieData) throws BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            if (Version.NEW == cookieData.getVersion()) {
+                if (zk.exists(cookiePath, false) == null) {
+                    try {
+                        zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
+                    } catch (NodeExistsException nne) {
+                        log.info("More than one bookie tried to create {} at once. Safe to ignore.",
+                            cookiePath);
+                    }
+                }
+                zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT);
+            } else {
+                if (!(cookieData.getVersion() instanceof LongVersion)) {
+                    throw new BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
+                }
+                zk.setData(
+                    zkPath,
+                    cookieData.getValue(),
+                    (int) ((LongVersion) cookieData.getVersion()).getLongVersion());
+            }
+        } catch (InterruptedException | KeeperException e) {
+            throw new MetadataStoreException("Failed to write cookie for bookie " + bookieId);
+        }
+    }
+
+    @Override
+    public Versioned<byte[]> readCookie(String bookieId) throws BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            Stat stat = zk.exists(zkPath, false);
+            byte[] data = zk.getData(zkPath, false, stat);
+            // sets stat version from ZooKeeper
+            LongVersion version = new LongVersion(stat.getVersion());
+            return new Versioned<>(data, version);
+        } catch (NoNodeException nne) {
+            throw new CookieNotFoundException(bookieId);
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException("Failed to read cookie for bookie " + bookieId);
+        }
+    }
+
+    @Override
+    public void removeCookie(String bookieId, Version version) throws BookieException {
+        String zkPath = getCookiePath(bookieId);
+        try {
+            zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
+        } catch (NoNodeException e) {
+            throw new CookieNotFoundException(bookieId);
+        } catch (InterruptedException | KeeperException e) {
+            throw new MetadataStoreException("Failed to delete cookie for bookie " + bookieId);
+        }
+
+        log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId);
+    }
+
+
+    @Override
+    public String getClusterInstanceId() throws BookieException {
+        String instanceId = null;
+        try {
+            if (zk.exists(conf.getZkLedgersRootPath(), null) == null) {
+                log.error("BookKeeper metadata doesn't exist in zookeeper. "
+                    + "Has the cluster been initialized? "
+                    + "Try running bin/bookkeeper shell metaformat");
+                throw new KeeperException.NoNodeException("BookKeeper metadata");
+            }
+            try {
+                byte[] data = zk.getData(conf.getZkLedgersRootPath() + "/"
+                    + INSTANCEID, false, null);
+                instanceId = new String(data, UTF_8);
+            } catch (KeeperException.NoNodeException e) {
+                log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
+            }
+        } catch (KeeperException | InterruptedException e) {
+            throw new MetadataStoreException("Failed to get cluster instance id", e);
+        }
+        return instanceId;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java
new file mode 100644
index 0000000..ecc26f5
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to service discovery.
+ */
+package org.apache.bookkeeper.discover;
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
index e51ca12..cd87a24 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/ExpandStorageService.java
@@ -29,9 +29,12 @@ import java.util.List;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,9 +83,11 @@ public class ExpandStorageService implements HttpEndpointService {
             }
 
             try {
-                Bookie.checkEnvironmentWithStorageExpansion(conf, zk,
+                RegistrationManager rm = new ZKRegistrationManager();
+                rm.initialize(conf, () -> { }, NullStatsLogger.INSTANCE);
+                Bookie.checkEnvironmentWithStorageExpansion(conf, rm,
                   Lists.newArrayList(journalDirectories), allLedgerDirs);
-            } catch (BookieException | IOException e) {
+            } catch (BookieException e) {
                 LOG.error("Exception occurred while updating cookie for storage expansion", e);
                 response.setCode(HttpServer.StatusCode.INTERNAL_ERROR);
                 response.setBody("Exception while updating cookie for storage expansion");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
index 74ffa33..b079f1b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/http/RecoveryBookieService.java
@@ -27,11 +27,14 @@ import org.apache.bookkeeper.bookie.Cookie;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.JsonUtil;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,7 +106,10 @@ public class RecoveryBookieService implements HttpEndpointService {
 
         if (HttpServer.Method.PUT == request.getMethod() &&
             !requestJsonBody.bookie_src.isEmpty()) {
-            ClientConfiguration adminConf = new ClientConfiguration(conf);
+
+            Class<? extends RegistrationManager> rmClass = conf.getRegistrationManagerClass();
+            RegistrationManager rm = ReflectionUtils.newInstance(rmClass);
+            rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
 
             String bookieSrcString[] = requestJsonBody.bookie_src.get(0).split(":");
             BookieSocketAddress bookieSrc = new BookieSocketAddress(
@@ -122,8 +128,8 @@ public class RecoveryBookieService implements HttpEndpointService {
                     LOG.info("Start recovering bookie.");
                     bka.recoverBookieData(bookieSrc, bookieDest);
                     if (deleteCookie) {
-                        Versioned<Cookie> cookie = Cookie.readFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc);
-                        cookie.getValue().deleteFromZooKeeper(bka.getZooKeeper(), adminConf, bookieSrc, cookie.getVersion());
+                        Versioned<Cookie> cookie = Cookie.readFromRegistrationManager(rm, bookieSrc);
+                        cookie.getValue().deleteFromRegistrationManager(rm, bookieSrc, cookie.getVersion());
                     }
                     LOG.info("Complete recovering bookie");
                 } catch (Exception e) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 135736f..3783c57 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -38,6 +38,8 @@ import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -49,6 +51,7 @@ import org.apache.bookkeeper.tls.SecurityException;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -66,10 +69,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
 
     @Rule
     public final TestName runtime = new TestName();
+    RegistrationManager rm;
 
     public BookieInitializationTest() {
         super(0);
-        String ledgersPath = "/" + runtime.getMethodName();
+        String ledgersPath = "/" + "ledgers" + runtime.getMethodName();
         baseClientConf.setZkLedgersRootPath(ledgersPath);
         baseConf.setZkLedgersRootPath(ledgersPath);
     }
@@ -78,6 +82,15 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
     public void setUp() throws Exception {
         super.setUp();
         zkUtil.createBKEnsemble("/" + runtime.getMethodName());
+        rm = new ZKRegistrationManager();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if(rm != null) {
+            rm.close();
+        }
     }
 
     private static class MockBookie extends Bookie {
@@ -109,8 +122,10 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
                     throws IOException, KeeperException, InterruptedException,
                     BookieException {
                 MockBookie bookie = new MockBookie(conf);
-                bookie.zk = zkc;
-                zkc.close();
+                rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+                bookie.registrationManager = rm;
+                ((ZKRegistrationManager) bookie.registrationManager).setZk(zkc);
+                ((ZKRegistrationManager) bookie.registrationManager).getZk().close();
                 return bookie;
             }
         };
@@ -134,15 +149,19 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
                 + conf.getBookiePort();
 
         MockBookie b = new MockBookie(conf);
-        b.zk = zkc;
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        b.registrationManager = rm;
+
         b.testRegisterBookie(conf);
+        ZooKeeper zooKeeper = ((ZKRegistrationManager) rm).getZk();
         Assert.assertNotNull("Bookie registration node doesn't exists!",
-                             zkc.exists(bkRegPath, false));
+            zooKeeper.exists(bkRegPath, false));
 
         // test register bookie again if the registeration node is created by itself.
         b.testRegisterBookie(conf);
         Assert.assertNotNull("Bookie registration node doesn't exists!",
-                zkc.exists(bkRegPath, false));
+            zooKeeper.exists(bkRegPath, false));
     }
 
     /**
@@ -161,18 +180,24 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         final String bkRegPath = conf.getZkAvailableBookiesPath() + "/"
                 + InetAddress.getLocalHost().getHostAddress() + ":"
                 + conf.getBookiePort();
-
         MockBookie b = new MockBookie(conf);
-        b.zk = zkc;
+
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        b.registrationManager = rm;
+
         b.testRegisterBookie(conf);
-        Stat bkRegNode1 = zkc.exists(bkRegPath, false);
+
+        Stat bkRegNode1 = ((ZKRegistrationManager) rm).getZk().exists(bkRegPath, false);
         Assert.assertNotNull("Bookie registration node doesn't exists!",
                 bkRegNode1);
 
         // simulating bookie restart, on restart bookie will create new
         // zkclient and doing the registration.
         ZooKeeperClient newZk = createNewZKClient();
-        b.zk = newZk;
+        RegistrationManager newRm = new ZKRegistrationManager();
+        newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        b.registrationManager = newRm;
 
         try {
             // deleting the znode, so that the bookie registration should
@@ -228,7 +253,11 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
                 + conf.getBookiePort();
 
         MockBookie b = new MockBookie(conf);
-        b.zk = zkc;
+
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        b.registrationManager = rm;
+
         b.testRegisterBookie(conf);
         Stat bkRegNode1 = zkc.exists(bkRegPath, false);
         Assert.assertNotNull("Bookie registration node doesn't exists!",
@@ -237,14 +266,19 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         // simulating bookie restart, on restart bookie will create new
         // zkclient and doing the registration.
         ZooKeeperClient newzk = createNewZKClient();
-        b.zk = newzk;
+        RegistrationManager newRm = new ZKRegistrationManager();
+        newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        b.registrationManager = newRm;
         try {
             b.testRegisterBookie(conf);
             fail("Should throw NodeExistsException as the znode is not getting expired");
         } catch (IOException e) {
-            Throwable t = e.getCause();
-            if (t instanceof KeeperException) {
-                KeeperException ke = (KeeperException) t;
+            Throwable t1 = e.getCause(); // BookieException.MetadataStoreException
+            Throwable t2 = t1.getCause(); // IOException
+            Throwable t3 = t2.getCause(); // KeeperException.NodeExistsException
+
+            if (t3 instanceof KeeperException) {
+                KeeperException ke = (KeeperException) t3;
                 Assert.assertTrue("ErrorCode:" + ke.code()
                         + ", Registration node doesn't exists",
                         ke.code() == KeeperException.Code.NODEEXISTS);
@@ -263,6 +297,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
             throw e;
         } finally {
             newzk.close();
+            newRm.close();
         }
     }
 
@@ -280,11 +315,17 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
                 tmpDir.getPath()).setLedgerDirNames(
                 new String[] { tmpDir.getPath() });
         BookieServer bs1 = new BookieServer(conf);
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        bs1.getBookie().setRegistrationManager(rm);
         bs1.start();
 
         // starting bk server with same conf
         try {
             BookieServer bs2 = new BookieServer(conf);
+            RegistrationManager newRm = new ZKRegistrationManager();
+            newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+            bs2.getBookie().registrationManager = newRm;
             bs2.start();
             fail("Should throw BindException, as the bk server is already running!");
         } catch (BindException e) {
@@ -313,6 +354,9 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         ServerConfiguration conf1 = new ServerConfiguration();
         conf1.addConfiguration(conf);
         BookieServer bs1 = new BookieServer(conf1);
+        conf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        bs1.getBookie().registrationManager = rm;
         bs1.start();
         assertFalse(0 == conf1.getBookiePort());
 
@@ -320,6 +364,9 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         ServerConfiguration conf2 = new ServerConfiguration();
         conf2.addConfiguration(conf);
         BookieServer bs2 = new BookieServer(conf2);
+        RegistrationManager newRm = new ZKRegistrationManager();
+        newRm.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
+        bs2.getBookie().registrationManager = newRm;
         bs2.start();
         assertFalse(0 == conf2.getBookiePort());
 
@@ -343,7 +390,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         try {
             new Bookie(conf);
             fail("Should throw ConnectionLossException as ZKServer is not running!");
-        } catch (KeeperException.ConnectionLossException e) {
+        } catch (BookieException.MetadataStoreException e) {
             // expected behaviour
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
index f622448..e817776 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
@@ -28,16 +28,21 @@ import static org.apache.bookkeeper.bookie.UpgradeTest.newV2LedgerDirectory;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.IOUtils;
@@ -45,6 +50,7 @@ import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.io.FileUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -68,6 +74,24 @@ public class CookieTest extends BookKeeperClusterTestCase {
         return d.getPath();
     }
 
+    RegistrationManager rm;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        rm = new ZKRegistrationManager();
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if(rm != null) {
+            rm.close();
+        }
+    }
+
     /**
      * Test starting bookie with clean state.
      */
@@ -98,7 +122,7 @@ public class CookieTest extends BookKeeperClusterTestCase {
             .setBookiePort(bookiePort);
         Cookie.Builder cookieBuilder = Cookie.generateCookie(conf1);
         Cookie c = cookieBuilder.build();
-        c.writeToZooKeeper(zkc, conf1, Version.NEW);
+        c.writeToRegistrationManager(rm, conf1, Version.NEW);
 
         String journalDir = newDirectory();
         String ledgerDir = newDirectory();
@@ -590,15 +614,15 @@ public class CookieTest extends BookKeeperClusterTestCase {
         Bookie b = new Bookie(conf); // should work fine
         b.start();
         b.shutdown();
-        Versioned<Cookie> zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+        Versioned<Cookie> zkCookie = Cookie.readFromRegistrationManager(rm, conf);
         Version version1 = zkCookie.getVersion();
         Assert.assertTrue("Invalid type expected ZkVersion type",
             version1 instanceof LongVersion);
         LongVersion zkVersion1 = (LongVersion) version1;
         Cookie cookie = zkCookie.getValue();
-        cookie.writeToZooKeeper(zkc, conf, version1);
+        cookie.writeToRegistrationManager(rm, conf, version1);
 
-        zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+        zkCookie = Cookie.readFromRegistrationManager(rm, conf);
         Version version2 = zkCookie.getVersion();
         Assert.assertTrue("Invalid type expected ZkVersion type",
             version2 instanceof LongVersion);
@@ -620,8 +644,8 @@ public class CookieTest extends BookKeeperClusterTestCase {
         Bookie b = new Bookie(conf); // should work fine
         b.start();
         b.shutdown();
-        Versioned<Cookie> zkCookie = Cookie.readFromZooKeeper(zkc, conf);
+        Versioned<Cookie> zkCookie = Cookie.readFromRegistrationManager(rm, conf);
         Cookie cookie = zkCookie.getValue();
-        cookie.deleteFromZooKeeper(zkc, conf, zkCookie.getVersion());
+        cookie.deleteFromRegistrationManager(rm, conf, zkCookie.getVersion());
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
index f5b646e..c11fbca 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
@@ -20,17 +20,21 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.discover.ZKRegistrationManager;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Assert;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
@@ -44,10 +48,29 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
 
     private final static Logger LOG = LoggerFactory.getLogger(UpdateCookieCmdTest.class);
 
+    RegistrationManager rm;
+
     public UpdateCookieCmdTest() {
         super(1);
     }
 
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        LOG.info("setUp ZKRegistrationManager");
+        rm = new ZKRegistrationManager();
+        baseConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        rm.initialize(baseConf, () -> {}, NullStatsLogger.INSTANCE);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if(rm != null) {
+            rm.close();
+        }
+    }
+
     /**
      * updatecookie to hostname
      */
@@ -109,12 +132,12 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         // creates cookie with ipaddress
         ServerConfiguration conf = bsConfs.get(0);
         conf.setUseHostNameAsBookieID(true); // sets to hostname
-        Cookie cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+        Cookie cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
         Cookie.Builder cookieBuilder = Cookie.newBuilder(cookie);
         conf.setUseHostNameAsBookieID(false); // sets to hostname
         final String newBookieHost = Bookie.getBookieAddress(conf).toString();
         cookieBuilder.setBookieHost(newBookieHost);
-        cookieBuilder.build().writeToZooKeeper(zkc, conf, Version.NEW);
+        cookieBuilder.build().writeToRegistrationManager(rm, conf, Version.NEW);
         verifyCookieInZooKeeper(conf, 2);
 
         // again issue hostname cmd
@@ -125,7 +148,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         Assert.assertEquals("Failed to return the error code!", 0, bkShell.run(argv));
 
         conf.setUseHostNameAsBookieID(true);
-        cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+        cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
         Assert.assertFalse("Cookie has created with IP!", cookie.isBookieHostCreatedFromIp());
         // ensure the old cookie is deleted
         verifyCookieInZooKeeper(conf, 1);
@@ -148,7 +171,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         BookieServer bks = bs.get(0);
         bks.shutdown();
 
-        String zkCookiePath = Cookie.getZkPath(conf);
+        String zkCookiePath = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE + "/" + Bookie.getBookieAddress(conf);
         Assert.assertNotNull("Cookie path doesn't still exists!", zkc.exists(zkCookiePath, false));
         zkc.delete(zkCookiePath, -1);
         Assert.assertNull("Cookie path still exists!", zkc.exists(zkCookiePath, false));
@@ -163,7 +186,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
     private void verifyCookieInZooKeeper(ServerConfiguration conf, int expectedCount) throws KeeperException,
             InterruptedException {
         List<String> cookies;
-        String bookieCookiePath1 = conf.getZkLedgersRootPath() + "/" + BookKeeperConstants.COOKIE_NODE;
+        String bookieCookiePath1 = conf.getZkLedgersRootPath() + "/" + COOKIE_NODE;
         cookies = zkc.getChildren(bookieCookiePath1, false);
         Assert.assertEquals("Wrongly updated the cookie!", expectedCount, cookies.size());
     }
@@ -174,7 +197,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         bks.shutdown();
 
         conf.setUseHostNameAsBookieID(!useHostNameAsBookieID);
-        Cookie cookie = Cookie.readFromZooKeeper(zkc, conf).getValue();
+        Cookie cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
         final boolean previousBookieID = cookie.isBookieHostCreatedFromIp();
         Assert.assertEquals("Wrong cookie!", useHostNameAsBookieID, previousBookieID);
 
@@ -187,7 +210,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         Assert.assertEquals("Failed to return exit code!", 0, bkShell.run(argv));
 
         newconf.setUseHostNameAsBookieID(useHostNameAsBookieID);
-        cookie = Cookie.readFromZooKeeper(zkc, newconf).getValue();
+        cookie = Cookie.readFromRegistrationManager(rm, newconf).getValue();
         Assert.assertEquals("Wrongly updated cookie!", previousBookieID, !cookie.isBookieHostCreatedFromIp());
         Assert.assertEquals("Wrongly updated cookie!", useHostNameAsBookieID, !cookie.isBookieHostCreatedFromIp());
         verifyCookieInZooKeeper(newconf, 1);

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].