You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2015/09/15 11:29:16 UTC

bookkeeper git commit: BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)

Repository: bookkeeper
Updated Branches:
  refs/heads/master a59bd5687 -> 5662416d8


BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/5662416d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/5662416d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/5662416d

Branch: refs/heads/master
Commit: 5662416d8ecef535fb089baa0a10e0dae08ae805
Parents: a59bd56
Author: Sijie Guo <si...@apache.org>
Authored: Tue Sep 15 01:38:26 2015 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Sep 15 01:38:26 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/bookkeeper/client/BKException.java   |   9 ++
 .../apache/bookkeeper/client/BookKeeper.java    |   9 +-
 .../bookkeeper/client/LedgerCreateOp.java       |  41 ++++--
 .../bookkeeper/conf/AbstractConfiguration.java  |   2 +-
 .../meta/AbstractZkLedgerManager.java           |  28 ++++
 .../bookkeeper/meta/CleanupLedgerManager.java   |   6 +-
 .../bookkeeper/meta/FlatLedgerManager.java      |  37 ------
 .../meta/FlatLedgerManagerFactory.java          |   5 +
 .../meta/HierarchicalLedgerManager.java         |  95 +-------------
 .../meta/HierarchicalLedgerManagerFactory.java  |   9 +-
 .../bookkeeper/meta/LedgerIdGenerator.java      |  41 ++++++
 .../apache/bookkeeper/meta/LedgerManager.java   |  43 +++---
 .../bookkeeper/meta/LedgerManagerFactory.java   |   8 ++
 .../bookkeeper/meta/MSLedgerManagerFactory.java | 130 +++++++------------
 .../bookkeeper/meta/ZkLedgerIdGenerator.java    | 120 +++++++++++++++++
 .../bookkeeper/bookie/CompactionTest.java       |   2 +-
 .../client/TestWatchEnsembleChange.java         |  91 +++++++------
 .../apache/bookkeeper/meta/GcLedgersTest.java   |  40 ++++--
 .../bookkeeper/meta/LedgerManagerTestCase.java  |  15 ++-
 .../meta/TestZkLedgerIdGenerator.java           | 122 +++++++++++++++++
 21 files changed, 556 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 046fab3..36d4372 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -100,6 +100,8 @@ Trunk (unreleased changes)
         BOOKKEEPER-760: Don't close PCBC proactively if bookies
                         disappeared from zookeeper znodes (sijie via fpj)
 
+        BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)
+
       bookkeeper-server:
 
         BOOKKEEPER-695: Some entry logs are not removed from the bookie storage (Matteo Merli via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index c5be32d..b43506d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -124,6 +124,7 @@ public abstract class BKException extends Exception {
         int MetadataVersionException = -17;
         int MetaStoreException = -18;
         int ClientClosedException = -19;
+        int LedgerExistException = -20;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -170,6 +171,8 @@ public abstract class BKException extends Exception {
             return "Error while using ZooKeeper";
         case Code.MetaStoreException:
             return "Error while using MetaStore";
+        case Code.LedgerExistException:
+            return "Ledger existed";
         case Code.LedgerRecoveryException:
             return "Error while recovering ledger";
         case Code.LedgerClosedException:
@@ -301,6 +304,12 @@ public abstract class BKException extends Exception {
         }
     }
 
+    public static class BKLedgerExistException extends BKException {
+        public BKLedgerExistException() {
+            super(Code.LedgerExistException);
+        }
+    }
+
     public static class BKLedgerRecoveryException extends BKException {
         public BKLedgerRecoveryException() {
             super(Code.LedgerRecoveryException);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index f74639b..6fe1371 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -48,7 +49,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
-import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.slf4j.Logger;
@@ -100,6 +100,7 @@ public class BookKeeper {
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
     final LedgerManager ledgerManager;
+    final LedgerIdGenerator ledgerIdGenerator;
 
     // Ensemble Placement Policy
     final EnsemblePlacementPolicy placementPolicy;
@@ -305,6 +306,7 @@ public class BookKeeper {
         // initialize ledger manager
         this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
         this.ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+        this.ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
@@ -333,6 +335,10 @@ public class BookKeeper {
         return ledgerManager;
     }
 
+    LedgerIdGenerator getLedgerIdGenerator() {
+        return ledgerIdGenerator;
+    }
+
     /**
      * There are 2 digest types that can be used for verification. The CRC32 is
      * cheap to compute but does not protect against byzantine bookies (i.e., a
@@ -809,6 +815,7 @@ public class BookKeeper {
             // Close ledger manage so all pending metadata requests would be failed
             // which will reject any incoming metadata requests.
             ledgerManager.close();
+            ledgerIdGenerator.close();
             ledgerManagerFactory.uninitialize();
         } catch (IOException ie) {
             LOG.error("Failed to close ledger manager : ", ie);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 9cda8ca..7c181b5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -39,13 +40,14 @@ import org.slf4j.LoggerFactory;
  * Encapsulates asynchronous ledger create operation
  *
  */
-class LedgerCreateOp implements GenericCallback<Long> {
+class LedgerCreateOp implements GenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
 
     CreateCallback cb;
     LedgerMetadata metadata;
     LedgerHandle lh;
+    Long ledgerId;
     Object ctx;
     byte[] passwd;
     BookKeeper bk;
@@ -60,12 +62,14 @@ class LedgerCreateOp implements GenericCallback<Long> {
      *       BookKeeper object
      * @param ensembleSize
      *       ensemble size
-     * @param quorumSize
-     *       quorum size
+     * @param writeQuorumSize
+     *       write quorum size
+     * @param ackQuorumSize
+     *       ack quorum size
      * @param digestType
      *       digest type, either MAC or CRC32
      * @param passwd
-     *       passowrd
+     *       password
      * @param cb
      *       callback implementation
      * @param ctx
@@ -111,16 +115,37 @@ class LedgerCreateOp implements GenericCallback<Long> {
          */
         metadata.addEnsemble(0L, ensemble);
 
-        // create a ledger with metadata
-        bk.getLedgerManager().createLedger(metadata, this);
+        createLedger();
+    }
+
+    void createLedger() {
+        // generate a ledger id and then create the ledger with metadata
+        final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
+        ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, Long ledgerId) {
+                if (BKException.Code.OK != rc) {
+                    createComplete(rc, null);
+                    return;
+                }
+
+                LedgerCreateOp.this.ledgerId = ledgerId;
+                // create a ledger with metadata
+                bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
+            }
+        });
     }
 
     /**
      * Callback when created ledger.
      */
     @Override
-    public void operationComplete(int rc, Long ledgerId) {
-        if (BKException.Code.OK != rc) {
+    public void operationComplete(int rc, Void result) {
+        if (BKException.Code.LedgerExistException == rc) {
+            // retry to generate a new ledger id
+            createLedger();
+            return;
+        } else if (BKException.Code.OK != rc) {
             createComplete(rc, null);
             return;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 3ec2b5a..2e66806 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -178,7 +178,7 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
     public String getZkAvailableBookiesPath() {
         return getZkLedgersRootPath() + "/" + AVAILABLE_NODE;
     }
-    
+
     /**
      * Set the max entries to keep in fragment for re-replication. If fragment
      * has more entries than this count, then the original fragment will be

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index f3f680d..6636506 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -42,11 +42,14 @@ import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -213,6 +216,31 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
         }
     }
 
+    @Override
+    public void createLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+            final GenericCallback<Void> ledgerCb) {
+        String ledgerPath = getLedgerPath(ledgerId);
+        StringCallback scb = new StringCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, String name) {
+                if (rc == Code.OK.intValue()) {
+                    // update version
+                    metadata.setVersion(new ZkVersion(0));
+                    ledgerCb.operationComplete(BKException.Code.OK, null);
+                } else if (rc == Code.NODEEXISTS.intValue()) {
+                    LOG.warn("Failed to create ledger metadata for {} which already exist", ledgerId);
+                    ledgerCb.operationComplete(BKException.Code.LedgerExistException, null);
+                } else {
+                    LOG.error("Could not create node for ledger {}", ledgerId,
+                            KeeperException.create(Code.get(rc), path));
+                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
+                }
+            }
+        };
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT, scb, null);
+    }
+
     /**
      * Removes ledger metadata from ZooKeeper if version matches.
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
index a7fbcf5..961e0d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
@@ -98,15 +98,15 @@ public class CleanupLedgerManager implements LedgerManager {
     }
 
     @Override
-    public void createLedger(LedgerMetadata metadata,
-                             GenericCallback<Long> cb) {
+    public void createLedgerMetadata(long lid, LedgerMetadata metadata,
+                                     GenericCallback<Void> cb) {
         closeLock.readLock().lock();
         try {
             if (closed) {
                 cb.operationComplete(BKException.Code.ClientClosedException, null);
                 return;
             }
-            underlying.createLedger(metadata, new CleanupGenericCallback<Long>(cb));
+            underlying.createLedgerMetadata(lid, metadata, new CleanupGenericCallback<Void>(cb));
         } finally {
             closeLock.readLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 2bc4258..6bd3216 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -22,19 +22,11 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,8 +52,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
      *          Configuration object
      * @param zk
      *          ZooKeeper Client Handle
-     * @param ledgerRootPath
-     *          ZooKeeper Path to store ledger metadata
      * @throws IOException when version is not compatible
      */
     public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
@@ -71,33 +61,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
     }
 
     @Override
-    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) {
-        StringCallback scb = new StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx,
-                    String name) {
-                if (Code.OK.intValue() != rc) {
-                    LOG.error("Could not create node for ledger",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    cb.operationComplete(BKException.Code.ZKException, null);
-                } else {
-                    // update znode status
-                    metadata.setVersion(new ZkVersion(0));
-                    try {
-                        long ledgerId = getLedgerId(name);
-                        cb.operationComplete(BKException.Code.OK, ledgerId);
-                    } catch (IOException ie) {
-                        LOG.error("Could not extract ledger-id from path:" + name, ie);
-                        cb.operationComplete(BKException.Code.ZKException, null);
-                    }
-                }
-            }
-        };
-        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, metadata.serialize(),
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null);
-    }
-
-    @Override
     public String getLedgerPath(long ledgerId) {
         StringBuilder sb = new StringBuilder();
         sb.append(ledgerPrefix)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index db16d26..46f8b9b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class FlatLedgerManagerFactory extends LedgerManagerFactory {
     }
 
     @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), null);
+    }
+
+    @Override
     public LedgerManager newLedgerManager() {
         return new FlatLedgerManager(conf, zk);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 7f2df73..bc62af4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -23,23 +23,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,10 +41,7 @@ import org.slf4j.LoggerFactory;
  * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
  *
  * <p>
- * Hierarchical Ledger Manager first obtain a global unique id from zookeeper using a EPHEMERAL_SEQUENTIAL
- * znode <i>(ledgersRootPath)/ledgers/idgen/ID-</i>.
- * Since zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 (zero) padding, i.e.
- * "&lt;path&gt;0000000001", HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
  * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
  * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
  * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
@@ -64,13 +54,9 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
     static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
 
     static final String IDGEN_ZNODE = "idgen";
-    static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
     private static final String MAX_ID_SUFFIX = "9999";
     private static final String MIN_ID_SUFFIX = "0000";
 
-    // Path to generate global id
-    private final String idGenPath;
-
     /**
      * Constructor
      *
@@ -81,83 +67,6 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
      */
     public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         super(conf, zk);
-
-        this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
-    }
-
-    @Override
-    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
-        ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-            CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, final String idPathName) {
-                if (rc != KeeperException.Code.OK.intValue()) {
-                    LOG.error("Could not generate new ledger id",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                    return;
-                }
-                /*
-                 * Extract ledger id from gen path
-                 */
-                long ledgerId;
-                try {
-                    ledgerId = getLedgerIdFromGenPath(idPathName);
-                } catch (IOException e) {
-                    LOG.error("Could not extract ledger-id from id gen path:" + path, e);
-                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                    return;
-                }
-                String ledgerPath = getLedgerPath(ledgerId);
-                final long lid = ledgerId;
-                StringCallback scb = new StringCallback() {
-                    @Override
-                    public void processResult(int rc, String path,
-                            Object ctx, String name) {
-                        if (rc != KeeperException.Code.OK.intValue()) {
-                            LOG.error("Could not create node for ledger",
-                                      KeeperException.create(KeeperException.Code.get(rc), path));
-                            ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                        } else {
-                            // update version
-                            metadata.setVersion(new ZkVersion(0));
-                            ledgerCb.operationComplete(BKException.Code.OK, lid);
-                        }
-                    }
-                };
-                ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(),
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null);
-                // delete the znode for id generation
-                scheduler.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx) {
-                                if (rc != KeeperException.Code.OK.intValue()) {
-                                    LOG.warn("Exception during deleting znode for id generation : ",
-                                             KeeperException.create(KeeperException.Code.get(rc), path));
-                                } else {
-                                    LOG.debug("Deleting znode for id generation : {}", idPathName);
-                                }
-                            }
-                        }, null);
-                    }
-                });
-            }
-        }, null);
-    }
-
-    // get ledger id from generation path
-    private long getLedgerIdFromGenPath(String nodeName) throws IOException {
-        long ledgerId;
-        try {
-            String parts[] = nodeName.split(IDGENERATION_PREFIX);
-            ledgerId = Long.parseLong(parts[parts.length - 1]);
-        } catch (NumberFormatException e) {
-            throw new IOException(e);
-        }
-        return ledgerId;
     }
 
     @Override
@@ -304,7 +213,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
          *          Callback to process element of list when success
          * @param finalCb
          *          Final callback to be called after all elements in the list are processed
-         * @param contxt
+         * @param context
          *          Context of final callback
          * @param successRc
          *          RC passed to final callback on success

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index b843e99..a165b0d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
     }
 
     @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE);
+    }
+
+    @Override
     public LedgerManager newLedgerManager() {
         return new HierarchicalLedgerManager(conf, zk);
     }
@@ -81,8 +86,7 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
         String ledgersRootPath = conf.getZkLedgersRootPath();
         List<String> children = zk.getChildren(ledgersRootPath, false);
         for (String child : children) {
-            if (!HierarchicalLedgerManager.IDGEN_ZNODE.equals(child)
-                    && ledgerManager.isSpecialZnode(child)) {
+            if (ledgerManager.isSpecialZnode(child)) {
                 continue;
             }
             ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
@@ -90,4 +94,5 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
         // Delete and recreate the LAYOUT information.
         super.format(conf, zk);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java
new file mode 100644
index 0000000..24d1f01
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerIdGenerator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.Closeable;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+
+/**
+ * The interface for global unique ledger ID generation
+ */
+public interface LedgerIdGenerator extends Closeable {
+
+    /**
+     * generate a global unique ledger id
+     *
+     * @param cb
+     *            Callback when a new ledger id is generated, return code:<ul>
+     *            <li>{@link BKException.Code.OK} if success</li>
+     *            <li>{@link BKException.Code.ZKException} when can't generate new ledger id</li>
+     *            </ul>
+     */
+    public void generateLedgerId(GenericCallback<Long> cb);
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 7229028..fe3c2cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -42,16 +42,21 @@ import org.apache.bookkeeper.versioning.Version;
 public interface LedgerManager extends Closeable {
 
     /**
-     * Create a new ledger with provided metadata
+     * Create a new ledger with provided ledger id and metadata
      *
+     * @param ledgerId
+     *            Ledger id provided to be created
      * @param metadata
-     *        Metadata provided when creating a new ledger
+     *            Metadata provided when creating the new ledger
      * @param cb
-     *        Callback when creating a new ledger.
-     *        {@link BKException.Code.ZKException} return code when can't generate
-     *        or extract new ledger id
+     *            Callback when creating a new ledger. Return code:<ul>
+     *            <li>{@link BKException.Code.OK} if success</li>
+     *            <li>{@link BKException.Code.LedgerExistException} if given ledger id exist</li>
+     *            <li>{@link BKException.Code.ZKException}/{@link BKException.Code.MetaStoreException}
+     *                 for other issue</li>
+     *            </ul>
      */
-    public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+    public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 
     /**
      * Remove a specified ledger metadata by ledgerId and version.
@@ -61,10 +66,12 @@ public interface LedgerManager extends Closeable {
      * @param version
      *          Ledger metadata version
      * @param cb
-     *          Callback when removed ledger metadata.
-     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
-     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when remove ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.MetadataVersionException} if version doesn't match</li>
+     *          <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb);
 
@@ -74,9 +81,11 @@ public interface LedgerManager extends Closeable {
      * @param ledgerId
      *          Ledger Id
      * @param readCb
-     *          Callback when read ledger metadata.
-     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when read ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
 
@@ -88,9 +97,11 @@ public interface LedgerManager extends Closeable {
      * @param metadata
      *          Ledger Metadata to write
      * @param cb
-     *          Callback when finished writing ledger metadata.
-     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when finished writing ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.MetadataVersionException} if version in metadata doesn't match</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 7c3cf5c..3a53623 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -68,6 +68,14 @@ public abstract class LedgerManagerFactory {
     public abstract void uninitialize() throws IOException;
 
     /**
+     * Return the ledger id generator, which is used for global unique ledger id
+     * generation.
+     *
+     * @return ledger id generator.
+     */
+    public abstract LedgerIdGenerator newLedgerIdGenerator();
+
+    /**
      * return ledger manager for client-side to manage ledger metadata.
      *
      * @return ledger manager

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 2510b89..9f7ef38 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManager.ReadLedgerMetadataTask;
 import org.apache.bookkeeper.metastore.MSException;
 import org.apache.bookkeeper.metastore.MSWatchedEvent;
 import org.apache.bookkeeper.metastore.MetaStore;
@@ -46,23 +45,20 @@ import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
 import org.apache.bookkeeper.metastore.MetastoreException;
 import org.apache.bookkeeper.metastore.MetastoreFactory;
 import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreTable;
 import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.MetastoreUtils;
 import org.apache.bookkeeper.metastore.MetastoreWatcher;
-import org.apache.bookkeeper.metastore.MSWatchedEvent.EventType;
 import org.apache.bookkeeper.metastore.Value;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +176,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
     }
 
+    @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), MsLedgerManager.IDGEN_ZNODE);
+    }
+
     static class MsLedgerManager implements LedgerManager, MetastoreWatcher {
         final ZooKeeper zk;
         final AbstractConfiguration conf;
@@ -189,15 +190,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         final int maxEntriesPerScan;
 
         static final String IDGEN_ZNODE = "ms-idgen";
-        static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
 
         // ledger metadata listeners
         protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners =
                 new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
 
-        // Path to generate global id
-        private final String idGenPath;
-
         // we use this to prevent long stack chains from building up in
         // callbacks
         ScheduledExecutorService scheduler;
@@ -266,7 +263,6 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
             // configuration settings
             maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
 
-            this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
             ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
                     .setNameFormat("MSLedgerManagerScheduler-%d");
             this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb
@@ -346,76 +342,28 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
 
         @Override
-        public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
-            ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, final String idPathName) {
-                            if (rc != KeeperException.Code.OK.intValue()) {
-                                LOG.error("Could not generate new ledger id",
-                                        KeeperException.create(KeeperException.Code.get(rc), path));
-                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                                return;
-                            }
-                            /*
-                             * Extract ledger id from gen path
-                             */
-                            long ledgerId;
-                            try {
-                                ledgerId = getLedgerIdFromGenPath(idPathName);
-                            } catch (IOException e) {
-                                LOG.error("Could not extract ledger-id from id gen path:" + path, e);
-                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                                return;
-                            }
-
-                            final long lid = ledgerId;
-                            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
-                                @Override
-                                public void complete(int rc, Version version, Object ctx) {
-                                    if (MSException.Code.BadVersion.getCode() == rc) {
-                                        ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
-                                        return;
-                                    }
-                                    if (MSException.Code.OK.getCode() != rc) {
-                                        ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
-                                        return;
-                                    }
-                                    LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid,
-                                            version });
-                                    // update version
-                                    metadata.setVersion(version);
-                                    ledgerCb.operationComplete(BKException.Code.OK, lid);
-                                }
-                            };
-
-                            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
-                                    Version.NEW, msCallback, null);
-                            zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
-                                @Override
-                                public void processResult(int rc, String path, Object ctx) {
-                                    if (rc != KeeperException.Code.OK.intValue()) {
-                                        LOG.warn("Exception during deleting znode for id generation : ",
-                                                KeeperException.create(KeeperException.Code.get(rc), path));
-                                    } else {
-                                        LOG.debug("Deleting znode for id generation : {}", idPathName);
-                                    }
-                                }
-                            }, null);
-                        }
-                    }, null);
-        }
+        public void createLedgerMetadata(final long lid, final LedgerMetadata metadata,
+                                         final GenericCallback<Void> ledgerCb) {
+            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version version, Object ctx) {
+                    if (MSException.Code.BadVersion.getCode() == rc) {
+                        ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
+                        return;
+                    }
+                    if (MSException.Code.OK.getCode() != rc) {
+                        ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
+                        return;
+                    }
+                    LOG.debug("Create ledger {} with version {} successfully.", lid, version);
+                    // update version
+                    metadata.setVersion(version);
+                    ledgerCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
 
-        // get ledger id from generation path
-        private long getLedgerIdFromGenPath(String nodeName) throws IOException {
-            long ledgerId;
-            try {
-                String parts[] = nodeName.split(IDGENERATION_PREFIX);
-                ledgerId = Long.parseLong(parts[parts.length - 1]);
-            } catch (NumberFormatException e) {
-                throw new IOException(e);
-            }
-            return ledgerId;
+            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+                    Version.NEW, msCallback, null);
         }
 
         @Override
@@ -688,7 +636,7 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
          * @param finalCb
          *            Final callback to be called after all elements in the list
          *            are processed
-         * @param contxt
+         * @param context
          *            Context of final callback
          * @param successRc
          *            RC passed to final callback on success
@@ -730,4 +678,24 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
     }
 
+    @Override
+    public void format(AbstractConfiguration conf, ZooKeeper zk) throws InterruptedException,
+            KeeperException, IOException {
+        MetastoreTable ledgerTable;
+        try {
+            ledgerTable = metastore.createScannableTable(TABLE_NAME);
+        } catch (MetastoreException mse) {
+            throw new IOException("Failed to instantiate table " + TABLE_NAME + " in metastore "
+                    + metastore.getName());
+        }
+        try {
+            MetastoreUtils.cleanTable(ledgerTable, conf.getMetastoreMaxEntriesPerScan());
+        } catch (MSException mse) {
+            throw new IOException("Exception when cleanning up table " + TABLE_NAME, mse);
+        }
+        LOG.info("Finished cleaning up table {}.", TABLE_NAME);
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, zk);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
new file mode 100644
index 0000000..a6c5b7b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper based ledger id generator class, which using EPHEMERAL_SEQUENTIAL
+ * with <i>(ledgerIdGenPath)/ID-</i> prefix to generate ledger id. Note
+ * zookeeper sequential counter has a format of %10d -- that is 10 digits with 0
+ * (zero) padding, i.e. "&lt;path&gt;0000000001", so ledger id space is
+ * fundamentally limited to 9 billion.
+ */
+public class ZkLedgerIdGenerator implements LedgerIdGenerator {
+    static final Logger LOG = LoggerFactory.getLogger(ZkLedgerIdGenerator.class);
+
+    final ZooKeeper zk;
+    final String ledgerIdGenPath;
+    final String ledgerPrefix;
+
+    public ZkLedgerIdGenerator(ZooKeeper zk,
+                               String ledgersPath,
+                               String idGenZnodeName) {
+        this.zk = zk;
+        if (StringUtils.isBlank(idGenZnodeName)) {
+            this.ledgerIdGenPath = ledgersPath;
+        } else {
+            this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
+        }
+        this.ledgerPrefix = this.ledgerIdGenPath + "/ID-";
+    }
+
+    @Override
+    public void generateLedgerId(final GenericCallback<Long> cb) {
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL_SEQUENTIAL,
+                new StringCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, final String idPathName) {
+                        if (rc != KeeperException.Code.OK.intValue()) {
+                            LOG.error("Could not generate new ledger id",
+                                    KeeperException.create(KeeperException.Code.get(rc), path));
+                            cb.operationComplete(BKException.Code.ZKException, null);
+                            return;
+                        }
+
+                        /*
+                         * Extract ledger id from generated path
+                         */
+                        long ledgerId;
+                        try {
+                            ledgerId = getLedgerIdFromGenPath(idPathName);
+                            cb.operationComplete(BKException.Code.OK, ledgerId);
+                        } catch (IOException e) {
+                            LOG.error("Could not extract ledger-id from id gen path:" + path, e);
+                            cb.operationComplete(BKException.Code.ZKException, null);
+                            return;
+                        }
+
+                        // delete the znode for id generation
+                        zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
+                            @Override
+                            public void processResult(int rc, String path, Object ctx) {
+                                if (rc != KeeperException.Code.OK.intValue()) {
+                                    LOG.warn("Exception during deleting znode for id generation : ",
+                                            KeeperException.create(KeeperException.Code.get(rc), path));
+                                } else {
+                                    LOG.debug("Deleting znode for id generation : {}", idPathName);
+                                }
+                            }
+                        }, null);
+                    }
+                }, null);
+    }
+
+    // get ledger id from generation path
+    private long getLedgerIdFromGenPath(String nodeName) throws IOException {
+        long ledgerId;
+        try {
+            String parts[] = nodeName.split(ledgerPrefix);
+            ledgerId = Long.parseLong(parts[parts.length - 1]);
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+        return ledgerId;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 101fdac..16bd4da 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -465,7 +465,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
     private LedgerManager getLedgerManager(final Set<Long> ledgers) {
         LedgerManager manager = new LedgerManager() {
                 @Override
-                public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) {
+                public void createLedgerMetadata(long lid, LedgerMetadata metadata, GenericCallback<Void> cb) {
                     unsupported();
                 }
                 @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index eb833a3..df74339 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -23,11 +23,13 @@ package org.apache.bookkeeper.client;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.meta.FlatLedgerManagerFactory;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -104,45 +106,54 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
 
     @Test(timeout = 60000)
     public void testWatchMetadataRemoval() throws Exception {
-       LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
-       factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
-       LedgerManager manager = factory.newLedgerManager();
-       final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
-       final CountDownLatch createLatch = new CountDownLatch(1);
-       final CountDownLatch removeLatch = new CountDownLatch(1);
-
-       manager.createLedger( new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
-                new BookkeeperInternalCallbacks.GenericCallback<Long>(){
-
-           @Override
-           public void operationComplete(int rc, Long result) {
-               bbLedgerId.putLong(result);
-               bbLedgerId.flip();
-               createLatch.countDown();
-           }
-       });
-       assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
-       final long createdLid = bbLedgerId.getLong();
-
-       manager.registerLedgerMetadataListener( createdLid,
-               new LedgerMetadataListener() {
-
-           @Override
-           public void onChanged( long ledgerId, LedgerMetadata metadata ) {
-               assertEquals(ledgerId, createdLid);
-               assertEquals(metadata, null);
-               removeLatch.countDown();
-           }
-       });
-
-       manager.removeLedgerMetadata( createdLid, Version.ANY,
-               new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-
-           @Override
-           public void operationComplete(int rc, Void result) {
-               assertEquals(rc, BKException.Code.OK);
-           }
-       });
-       assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
+        LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
+        factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
+        final LedgerManager manager = factory.newLedgerManager();
+        LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator();
+
+        final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
+        final CountDownLatch createLatch = new CountDownLatch(1);
+        final CountDownLatch removeLatch = new CountDownLatch(1);
+
+        idGenerator.generateLedgerId(new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, final Long lid) {
+                manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
+                         new BookkeeperInternalCallbacks.GenericCallback<Void>(){
+
+                    @Override
+                    public void operationComplete(int rc, Void result) {
+                        bbLedgerId.putLong(lid);
+                        bbLedgerId.flip();
+                        createLatch.countDown();
+                    }
+                });
+
+            }
+        });
+
+        assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
+        final long createdLid = bbLedgerId.getLong();
+
+        manager.registerLedgerMetadataListener( createdLid,
+                new LedgerMetadataListener() {
+
+            @Override
+            public void onChanged( long ledgerId, LedgerMetadata metadata ) {
+                assertEquals(ledgerId, createdLid);
+                assertEquals(metadata, null);
+                removeLatch.countDown();
+            }
+        });
+
+        manager.removeLedgerMetadata( createdLid, Version.ANY,
+                new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+
+            @Override
+            public void operationComplete(int rc, Void result) {
+                assertEquals(rc, BKException.Code.OK);
+            }
+        });
+        assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 19aab44..de352b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,6 +21,7 @@
 
 package org.apache.bookkeeper.meta;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -63,23 +64,38 @@ public class GcLedgersTest extends LedgerManagerTestCase {
     /**
      * Create ledgers
      */
-    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
+    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
         final AtomicInteger expected = new AtomicInteger(numLedgers);
         for (int i=0; i<numLedgers; i++) {
-            getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()),
-                new GenericCallback<Long>() {
+            getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
                 @Override
-                public void operationComplete(int rc, Long ledgerId) {
-                    if (rc == BKException.Code.OK) {
-                        activeLedgers.put(ledgerId, true);
-                        createdLedgers.add(ledgerId);
-                    }
-                    synchronized (expected) {
-                        int num = expected.decrementAndGet();
-                        if (num == 0) {
-                            expected.notify();
+                public void operationComplete(int rc, final Long ledgerId) {
+                    if (BKException.Code.OK != rc) {
+                        synchronized (expected) {
+                            int num = expected.decrementAndGet();
+                            if (num == 0) {
+                                expected.notify();
+                            }
                         }
+                        return;
                     }
+
+                    getLedgerManager().createLedgerMetadata(ledgerId,
+                            new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() {
+                                @Override
+                                public void operationComplete(int rc, Void result) {
+                                    if (rc == BKException.Code.OK) {
+                                        activeLedgers.put(ledgerId, true);
+                                        createdLedgers.add(ledgerId);
+                                    }
+                                    synchronized (expected) {
+                                        int num = expected.decrementAndGet();
+                                        if (num == 0) {
+                                            expected.notify();
+                                        }
+                                    }
+                                }
+                            });
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index b95d2db..dae61bc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -21,20 +21,19 @@
 
 package org.apache.bookkeeper.meta;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.SnapshotMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test case to run over serveral ledger managers
@@ -45,6 +44,7 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
 
     LedgerManagerFactory ledgerManagerFactory;
     LedgerManager ledgerManager = null;
+    LedgerIdGenerator ledgerIdGenerator = null;
     SnapshotMap<Long, Boolean> activeLedgers = null;
 
     public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
@@ -60,6 +60,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         return ledgerManager;
     }
 
+    public LedgerIdGenerator getLedgerIdGenerator() throws IOException {
+        if (null == ledgerIdGenerator) {
+            ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+        }
+        return ledgerIdGenerator;
+    }
+
     @Parameters
     public static Collection<Object[]> configs() {
         return Arrays.asList(new Object[][] {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/5662416d/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java
new file mode 100644
index 0000000..708fbc7
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkLedgerIdGenerator.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.TestCase;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestZkLedgerIdGenerator extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(TestZkLedgerIdGenerator.class);
+
+    ZooKeeperUtil zkutil;
+    ZooKeeper zk;
+
+    LedgerIdGenerator ledgerIdGenerator;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Setting up test");
+        super.setUp();
+
+        zkutil = new ZooKeeperUtil();
+        zkutil.startServer();
+        zk = zkutil.getZooKeeperClient();
+
+        ledgerIdGenerator = new ZkLedgerIdGenerator(zk,
+                "/test-zk-ledger-id-generator", "idgen");
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("Tearing down test");
+        ledgerIdGenerator.close();
+        zk.close();
+        zkutil.killServer();
+
+        super.tearDown();
+    }
+
+    @Test(timeout=60000)
+    public void testGenerateLedgerId() throws Exception {
+        // Create *nThread* threads each generate *nLedgers* ledger id,
+        // and then check there is no identical ledger id.
+        final int nThread = 2;
+        final int nLedgers = 2000;
+        final CountDownLatch countDownLatch = new CountDownLatch(nThread*nLedgers);
+
+        final AtomicInteger errCount = new AtomicInteger(0);
+        final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>();
+        final GenericCallback<Long> cb = new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, Long result) {
+                if (Code.OK.intValue() == rc) {
+                    ledgerIds.add(result);
+                } else {
+                    errCount.incrementAndGet();
+                }
+                countDownLatch.countDown();
+            }
+        };
+
+        long start = System.currentTimeMillis();
+
+        for (int i = 0; i < nThread; i++) {
+            new Thread() {
+                @Override
+                public void run() {
+                    for (int j = 0; j < nLedgers; j++) {
+                        ledgerIdGenerator.generateLedgerId(cb);
+                    }
+                }
+            }.start();
+        }
+
+        assertTrue("Wait ledger id generation threads to stop timeout : ",
+                countDownLatch.await(30, TimeUnit.SECONDS));
+        LOG.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(),
+                System.currentTimeMillis() - start);
+        assertEquals("Error occur during ledger id generation : ", 0, errCount.get());
+
+        Set<Long> ledgers = new HashSet<Long>();
+        while (!ledgerIds.isEmpty()) {
+            Long ledger = ledgerIds.poll();
+            assertNotNull("Generated ledger id is null : ", ledger);
+            assertFalse("Ledger id [" + ledger + "] conflict : ", ledgers.contains(ledger));
+            ledgers.add(ledger);
+        }
+    }
+
+}