You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2015/11/13 08:12:27 UTC

bookkeeper git commit: BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Sijie via mmerli)

Repository: bookkeeper
Updated Branches:
  refs/heads/branch-4.3 c985aa0b9 -> 698283126


BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Sijie via mmerli)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/69828312
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/69828312
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/69828312

Branch: refs/heads/branch-4.3
Commit: 698283126ca1e9854be28b063e91208fdf4e710b
Parents: c985aa0
Author: Matteo Merli <mm...@apache.org>
Authored: Thu Nov 12 22:53:19 2015 -0800
Committer: Matteo Merli <mm...@apache.org>
Committed: Thu Nov 12 22:53:19 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/bookkeeper/client/BKException.java   |   9 ++
 .../apache/bookkeeper/client/BookKeeper.java    |   9 ++
 .../bookkeeper/client/LedgerCreateOp.java       |  41 ++++--
 .../bookkeeper/conf/AbstractConfiguration.java  |   2 +-
 .../meta/AbstractZkLedgerManager.java           |  28 ++++
 .../bookkeeper/meta/CleanupLedgerManager.java   |   6 +-
 .../bookkeeper/meta/FlatLedgerManager.java      |  37 ------
 .../meta/FlatLedgerManagerFactory.java          |   5 +
 .../meta/HierarchicalLedgerManager.java         |  95 +-------------
 .../meta/HierarchicalLedgerManagerFactory.java  |   9 +-
 .../apache/bookkeeper/meta/LedgerManager.java   |  43 +++---
 .../bookkeeper/meta/LedgerManagerFactory.java   |   8 ++
 .../bookkeeper/meta/MSLedgerManagerFactory.java | 130 +++++++------------
 .../bookkeeper/bookie/CompactionTest.java       |   2 +-
 .../client/TestWatchEnsembleChange.java         |  91 +++++++------
 .../apache/bookkeeper/meta/GcLedgersTest.java   |  40 ++++--
 .../bookkeeper/meta/LedgerManagerTestCase.java  |  15 ++-
 18 files changed, 274 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 378ae55..8a5d2d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,6 +61,8 @@ Release 4.3.1 - 2015-05-19
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)
 
+        BOOKKEEPER-438: Move ledger id generation out of LedgerManager (Tong Yu via sijie)
+
       bookkeeper-server:
 
         BOOKKEEPER-833: EntryLogId and EntryLogLimit should not be larger than Integer.MAX_VALUE (sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index c5be32d..b43506d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -124,6 +124,7 @@ public abstract class BKException extends Exception {
         int MetadataVersionException = -17;
         int MetaStoreException = -18;
         int ClientClosedException = -19;
+        int LedgerExistException = -20;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -170,6 +171,8 @@ public abstract class BKException extends Exception {
             return "Error while using ZooKeeper";
         case Code.MetaStoreException:
             return "Error while using MetaStore";
+        case Code.LedgerExistException:
+            return "Ledger existed";
         case Code.LedgerRecoveryException:
             return "Error while recovering ledger";
         case Code.LedgerClosedException:
@@ -301,6 +304,12 @@ public abstract class BKException extends Exception {
         }
     }
 
+    public static class BKLedgerExistException extends BKException {
+        public BKLedgerExistException() {
+            super(Code.LedgerExistException);
+        }
+    }
+
     public static class BKLedgerRecoveryException extends BKException {
         public BKLedgerRecoveryException() {
             super(Code.LedgerRecoveryException);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 49d8e59..9e2cd00 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -98,6 +99,7 @@ public class BookKeeper {
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
     final LedgerManager ledgerManager;
+    final LedgerIdGenerator ledgerIdGenerator;
 
     // Ensemble Placement Policy
     final EnsemblePlacementPolicy placementPolicy;
@@ -221,6 +223,7 @@ public class BookKeeper {
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
         ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+        ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
 
         ownChannelFactory = true;
         ownZKHandle = true;
@@ -307,6 +310,7 @@ public class BookKeeper {
 
         ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, zk);
         ledgerManager = new CleanupLedgerManager(ledgerManagerFactory.newLedgerManager());
+        ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
     }
 
     private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
@@ -335,6 +339,10 @@ public class BookKeeper {
         return ledgerManager;
     }
 
+    LedgerIdGenerator getLedgerIdGenerator() {
+        return ledgerIdGenerator;
+    }
+
     /**
      * There are 2 digest types that can be used for verification. The CRC32 is
      * cheap to compute but does not protect against byzantine bookies (i.e., a
@@ -811,6 +819,7 @@ public class BookKeeper {
             // Close ledger manage so all pending metadata requests would be failed
             // which will reject any incoming metadata requests.
             ledgerManager.close();
+            ledgerIdGenerator.close();
             ledgerManagerFactory.uninitialize();
         } catch (IOException ie) {
             LOG.error("Failed to close ledger manager : ", ie);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index fe223af..5ed0901 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -38,13 +39,14 @@ import org.slf4j.LoggerFactory;
  * Encapsulates asynchronous ledger create operation
  *
  */
-class LedgerCreateOp implements GenericCallback<Long> {
+class LedgerCreateOp implements GenericCallback<Void> {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
 
     CreateCallback cb;
     LedgerMetadata metadata;
     LedgerHandle lh;
+    Long ledgerId;
     Object ctx;
     byte[] passwd;
     BookKeeper bk;
@@ -59,12 +61,14 @@ class LedgerCreateOp implements GenericCallback<Long> {
      *       BookKeeper object
      * @param ensembleSize
      *       ensemble size
-     * @param quorumSize
-     *       quorum size
+     * @param writeQuorumSize
+     *       write quorum size
+     * @param ackQuorumSize
+     *       ack quorum size
      * @param digestType
      *       digest type, either MAC or CRC32
      * @param passwd
-     *       passowrd
+     *       password
      * @param cb
      *       callback implementation
      * @param ctx
@@ -110,16 +114,37 @@ class LedgerCreateOp implements GenericCallback<Long> {
          */
         metadata.addEnsemble(0L, ensemble);
 
-        // create a ledger with metadata
-        bk.getLedgerManager().createLedger(metadata, this);
+        createLedger();
+    }
+
+    void createLedger() {
+        // generate a ledger id and then create the ledger with metadata
+        final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator();
+        ledgerIdGenerator.generateLedgerId(new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, Long ledgerId) {
+                if (BKException.Code.OK != rc) {
+                    createComplete(rc, null);
+                    return;
+                }
+
+                LedgerCreateOp.this.ledgerId = ledgerId;
+                // create a ledger with metadata
+                bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this);
+            }
+        });
     }
 
     /**
      * Callback when created ledger.
      */
     @Override
-    public void operationComplete(int rc, Long ledgerId) {
-        if (BKException.Code.OK != rc) {
+    public void operationComplete(int rc, Void result) {
+        if (BKException.Code.LedgerExistException == rc) {
+            // retry to generate a new ledger id
+            createLedger();
+            return;
+        } else if (BKException.Code.OK != rc) {
             createComplete(rc, null);
             return;
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 3ec2b5a..2e66806 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -178,7 +178,7 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
     public String getZkAvailableBookiesPath() {
         return getZkLedgersRootPath() + "/" + AVAILABLE_NODE;
     }
-    
+
     /**
      * Set the max entries to keep in fragment for re-replication. If fragment
      * has more entries than this count, then the original fragment will be

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index f3f680d..6636506 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -42,11 +42,14 @@ import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -213,6 +216,31 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
         }
     }
 
+    @Override
+    public void createLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+            final GenericCallback<Void> ledgerCb) {
+        String ledgerPath = getLedgerPath(ledgerId);
+        StringCallback scb = new StringCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, String name) {
+                if (rc == Code.OK.intValue()) {
+                    // update version
+                    metadata.setVersion(new ZkVersion(0));
+                    ledgerCb.operationComplete(BKException.Code.OK, null);
+                } else if (rc == Code.NODEEXISTS.intValue()) {
+                    LOG.warn("Failed to create ledger metadata for {} which already exist", ledgerId);
+                    ledgerCb.operationComplete(BKException.Code.LedgerExistException, null);
+                } else {
+                    LOG.error("Could not create node for ledger {}", ledgerId,
+                            KeeperException.create(Code.get(rc), path));
+                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
+                }
+            }
+        };
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT, scb, null);
+    }
+
     /**
      * Removes ledger metadata from ZooKeeper if version matches.
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
index a7fbcf5..961e0d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/CleanupLedgerManager.java
@@ -98,15 +98,15 @@ public class CleanupLedgerManager implements LedgerManager {
     }
 
     @Override
-    public void createLedger(LedgerMetadata metadata,
-                             GenericCallback<Long> cb) {
+    public void createLedgerMetadata(long lid, LedgerMetadata metadata,
+                                     GenericCallback<Void> cb) {
         closeLock.readLock().lock();
         try {
             if (closed) {
                 cb.operationComplete(BKException.Code.ClientClosedException, null);
                 return;
             }
-            underlying.createLedger(metadata, new CleanupGenericCallback<Long>(cb));
+            underlying.createLedgerMetadata(lid, metadata, new CleanupGenericCallback<Void>(cb));
         } finally {
             closeLock.readLock().unlock();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 2bc4258..6bd3216 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -22,19 +22,11 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,8 +52,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
      *          Configuration object
      * @param zk
      *          ZooKeeper Client Handle
-     * @param ledgerRootPath
-     *          ZooKeeper Path to store ledger metadata
      * @throws IOException when version is not compatible
      */
     public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
@@ -71,33 +61,6 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
     }
 
     @Override
-    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> cb) {
-        StringCallback scb = new StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx,
-                    String name) {
-                if (Code.OK.intValue() != rc) {
-                    LOG.error("Could not create node for ledger",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    cb.operationComplete(BKException.Code.ZKException, null);
-                } else {
-                    // update znode status
-                    metadata.setVersion(new ZkVersion(0));
-                    try {
-                        long ledgerId = getLedgerId(name);
-                        cb.operationComplete(BKException.Code.OK, ledgerId);
-                    } catch (IOException ie) {
-                        LOG.error("Could not extract ledger-id from path:" + name, ie);
-                        cb.operationComplete(BKException.Code.ZKException, null);
-                    }
-                }
-            }
-        };
-        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, metadata.serialize(),
-            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null);
-    }
-
-    @Override
     public String getLedgerPath(long ledgerId) {
         StringBuilder sb = new StringBuilder();
         sb.append(ledgerPrefix)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index db16d26..46f8b9b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class FlatLedgerManagerFactory extends LedgerManagerFactory {
     }
 
     @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), null);
+    }
+
+    @Override
     public LedgerManager newLedgerManager() {
         return new FlatLedgerManager(conf, zk);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index 7f2df73..bc62af4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -23,23 +23,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,10 +41,7 @@ import org.slf4j.LoggerFactory;
  * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
  *
  * <p>
- * Hierarchical Ledger Manager first obtain a global unique id from zookeeper using a EPHEMERAL_SEQUENTIAL
- * znode <i>(ledgersRootPath)/ledgers/idgen/ID-</i>.
- * Since zookeeper sequential counter has a format of %10d -- that is 10 digits with 0 (zero) padding, i.e.
- * "&lt;path&gt;0000000001", HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
  * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
  * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
  * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
@@ -64,13 +54,9 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
     static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
 
     static final String IDGEN_ZNODE = "idgen";
-    static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
     private static final String MAX_ID_SUFFIX = "9999";
     private static final String MIN_ID_SUFFIX = "0000";
 
-    // Path to generate global id
-    private final String idGenPath;
-
     /**
      * Constructor
      *
@@ -81,83 +67,6 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
      */
     public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         super(conf, zk);
-
-        this.idGenPath = ledgerRootPath + IDGENERATION_PREFIX;
-    }
-
-    @Override
-    public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
-        ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-            CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, final String idPathName) {
-                if (rc != KeeperException.Code.OK.intValue()) {
-                    LOG.error("Could not generate new ledger id",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                    return;
-                }
-                /*
-                 * Extract ledger id from gen path
-                 */
-                long ledgerId;
-                try {
-                    ledgerId = getLedgerIdFromGenPath(idPathName);
-                } catch (IOException e) {
-                    LOG.error("Could not extract ledger-id from id gen path:" + path, e);
-                    ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                    return;
-                }
-                String ledgerPath = getLedgerPath(ledgerId);
-                final long lid = ledgerId;
-                StringCallback scb = new StringCallback() {
-                    @Override
-                    public void processResult(int rc, String path,
-                            Object ctx, String name) {
-                        if (rc != KeeperException.Code.OK.intValue()) {
-                            LOG.error("Could not create node for ledger",
-                                      KeeperException.create(KeeperException.Code.get(rc), path));
-                            ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                        } else {
-                            // update version
-                            metadata.setVersion(new ZkVersion(0));
-                            ledgerCb.operationComplete(BKException.Code.OK, lid);
-                        }
-                    }
-                };
-                ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(),
-                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, scb, null);
-                // delete the znode for id generation
-                scheduler.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx) {
-                                if (rc != KeeperException.Code.OK.intValue()) {
-                                    LOG.warn("Exception during deleting znode for id generation : ",
-                                             KeeperException.create(KeeperException.Code.get(rc), path));
-                                } else {
-                                    LOG.debug("Deleting znode for id generation : {}", idPathName);
-                                }
-                            }
-                        }, null);
-                    }
-                });
-            }
-        }, null);
-    }
-
-    // get ledger id from generation path
-    private long getLedgerIdFromGenPath(String nodeName) throws IOException {
-        long ledgerId;
-        try {
-            String parts[] = nodeName.split(IDGENERATION_PREFIX);
-            ledgerId = Long.parseLong(parts[parts.length - 1]);
-        } catch (NumberFormatException e) {
-            throw new IOException(e);
-        }
-        return ledgerId;
     }
 
     @Override
@@ -304,7 +213,7 @@ class HierarchicalLedgerManager extends AbstractZkLedgerManager {
          *          Callback to process element of list when success
          * @param finalCb
          *          Final callback to be called after all elements in the list are processed
-         * @param contxt
+         * @param context
          *          Context of final callback
          * @param successRc
          *          RC passed to final callback on success

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index b843e99..a165b0d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -64,6 +64,11 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
     }
 
     @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE);
+    }
+
+    @Override
     public LedgerManager newLedgerManager() {
         return new HierarchicalLedgerManager(conf, zk);
     }
@@ -81,8 +86,7 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
         String ledgersRootPath = conf.getZkLedgersRootPath();
         List<String> children = zk.getChildren(ledgersRootPath, false);
         for (String child : children) {
-            if (!HierarchicalLedgerManager.IDGEN_ZNODE.equals(child)
-                    && ledgerManager.isSpecialZnode(child)) {
+            if (ledgerManager.isSpecialZnode(child)) {
                 continue;
             }
             ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
@@ -90,4 +94,5 @@ public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
         // Delete and recreate the LAYOUT information.
         super.format(conf, zk);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
index 7229028..fe3c2cf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
@@ -42,16 +42,21 @@ import org.apache.bookkeeper.versioning.Version;
 public interface LedgerManager extends Closeable {
 
     /**
-     * Create a new ledger with provided metadata
+     * Create a new ledger with provided ledger id and metadata
      *
+     * @param ledgerId
+     *            Ledger id provided to be created
      * @param metadata
-     *        Metadata provided when creating a new ledger
+     *            Metadata provided when creating the new ledger
      * @param cb
-     *        Callback when creating a new ledger.
-     *        {@link BKException.Code.ZKException} return code when can't generate
-     *        or extract new ledger id
+     *            Callback when creating a new ledger. Return code:<ul>
+     *            <li>{@link BKException.Code.OK} if success</li>
+     *            <li>{@link BKException.Code.LedgerExistException} if given ledger id exist</li>
+     *            <li>{@link BKException.Code.ZKException}/{@link BKException.Code.MetaStoreException}
+     *                 for other issue</li>
+     *            </ul>
      */
-    public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+    public void createLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 
     /**
      * Remove a specified ledger metadata by ledgerId and version.
@@ -61,10 +66,12 @@ public interface LedgerManager extends Closeable {
      * @param version
      *          Ledger metadata version
      * @param cb
-     *          Callback when removed ledger metadata.
-     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
-     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when remove ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.MetadataVersionException} if version doesn't match</li>
+     *          <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb);
 
@@ -74,9 +81,11 @@ public interface LedgerManager extends Closeable {
      * @param ledgerId
      *          Ledger Id
      * @param readCb
-     *          Callback when read ledger metadata.
-     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when read ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.NoSuchLedgerExistsException} if ledger not exist</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
 
@@ -88,9 +97,11 @@ public interface LedgerManager extends Closeable {
      * @param metadata
      *          Ledger Metadata to write
      * @param cb
-     *          Callback when finished writing ledger metadata.
-     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
-     *          {@link BKException.Code.ZKException} return code when other issues happen.
+     *          Callback when finished writing ledger metadata. Return code:<ul>
+     *          <li>{@link BKException.Code.OK} if success</li>
+     *          <li>{@link BKException.Code.MetadataVersionException} if version in metadata doesn't match</li>
+     *          <li>{@link BKException.Code.ZKException} for other issue</li>
+     *          </ul>
      */
     public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 7c3cf5c..3a53623 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -68,6 +68,14 @@ public abstract class LedgerManagerFactory {
     public abstract void uninitialize() throws IOException;
 
     /**
+     * Return the ledger id generator, which is used for global unique ledger id
+     * generation.
+     *
+     * @return ledger id generator.
+     */
+    public abstract LedgerIdGenerator newLedgerIdGenerator();
+
+    /**
      * return ledger manager for client-side to manage ledger metadata.
      *
      * @return ledger manager

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 2510b89..9f7ef38 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -36,7 +36,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.bookkeeper.meta.AbstractZkLedgerManager.ReadLedgerMetadataTask;
 import org.apache.bookkeeper.metastore.MSException;
 import org.apache.bookkeeper.metastore.MSWatchedEvent;
 import org.apache.bookkeeper.metastore.MetaStore;
@@ -46,23 +45,20 @@ import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
 import org.apache.bookkeeper.metastore.MetastoreException;
 import org.apache.bookkeeper.metastore.MetastoreFactory;
 import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreTable;
 import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.MetastoreUtils;
 import org.apache.bookkeeper.metastore.MetastoreWatcher;
-import org.apache.bookkeeper.metastore.MSWatchedEvent.EventType;
 import org.apache.bookkeeper.metastore.Value;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -180,6 +176,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
     }
 
+    @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), MsLedgerManager.IDGEN_ZNODE);
+    }
+
     static class MsLedgerManager implements LedgerManager, MetastoreWatcher {
         final ZooKeeper zk;
         final AbstractConfiguration conf;
@@ -189,15 +190,11 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         final int maxEntriesPerScan;
 
         static final String IDGEN_ZNODE = "ms-idgen";
-        static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
 
         // ledger metadata listeners
         protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners =
                 new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
 
-        // Path to generate global id
-        private final String idGenPath;
-
         // we use this to prevent long stack chains from building up in
         // callbacks
         ScheduledExecutorService scheduler;
@@ -266,7 +263,6 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
             // configuration settings
             maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
 
-            this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
             ThreadFactoryBuilder tfb = new ThreadFactoryBuilder()
                     .setNameFormat("MSLedgerManagerScheduler-%d");
             this.scheduler = Executors.newSingleThreadScheduledExecutor(tfb
@@ -346,76 +342,28 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
 
         @Override
-        public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
-            ZkUtils.asyncCreateFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, final String idPathName) {
-                            if (rc != KeeperException.Code.OK.intValue()) {
-                                LOG.error("Could not generate new ledger id",
-                                        KeeperException.create(KeeperException.Code.get(rc), path));
-                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                                return;
-                            }
-                            /*
-                             * Extract ledger id from gen path
-                             */
-                            long ledgerId;
-                            try {
-                                ledgerId = getLedgerIdFromGenPath(idPathName);
-                            } catch (IOException e) {
-                                LOG.error("Could not extract ledger-id from id gen path:" + path, e);
-                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
-                                return;
-                            }
-
-                            final long lid = ledgerId;
-                            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
-                                @Override
-                                public void complete(int rc, Version version, Object ctx) {
-                                    if (MSException.Code.BadVersion.getCode() == rc) {
-                                        ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
-                                        return;
-                                    }
-                                    if (MSException.Code.OK.getCode() != rc) {
-                                        ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
-                                        return;
-                                    }
-                                    LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid,
-                                            version });
-                                    // update version
-                                    metadata.setVersion(version);
-                                    ledgerCb.operationComplete(BKException.Code.OK, lid);
-                                }
-                            };
-
-                            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
-                                    Version.NEW, msCallback, null);
-                            zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
-                                @Override
-                                public void processResult(int rc, String path, Object ctx) {
-                                    if (rc != KeeperException.Code.OK.intValue()) {
-                                        LOG.warn("Exception during deleting znode for id generation : ",
-                                                KeeperException.create(KeeperException.Code.get(rc), path));
-                                    } else {
-                                        LOG.debug("Deleting znode for id generation : {}", idPathName);
-                                    }
-                                }
-                            }, null);
-                        }
-                    }, null);
-        }
+        public void createLedgerMetadata(final long lid, final LedgerMetadata metadata,
+                                         final GenericCallback<Void> ledgerCb) {
+            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version version, Object ctx) {
+                    if (MSException.Code.BadVersion.getCode() == rc) {
+                        ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
+                        return;
+                    }
+                    if (MSException.Code.OK.getCode() != rc) {
+                        ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
+                        return;
+                    }
+                    LOG.debug("Create ledger {} with version {} successfully.", lid, version);
+                    // update version
+                    metadata.setVersion(version);
+                    ledgerCb.operationComplete(BKException.Code.OK, null);
+                }
+            };
 
-        // get ledger id from generation path
-        private long getLedgerIdFromGenPath(String nodeName) throws IOException {
-            long ledgerId;
-            try {
-                String parts[] = nodeName.split(IDGENERATION_PREFIX);
-                ledgerId = Long.parseLong(parts[parts.length - 1]);
-            } catch (NumberFormatException e) {
-                throw new IOException(e);
-            }
-            return ledgerId;
+            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+                    Version.NEW, msCallback, null);
         }
 
         @Override
@@ -688,7 +636,7 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
          * @param finalCb
          *            Final callback to be called after all elements in the list
          *            are processed
-         * @param contxt
+         * @param context
          *            Context of final callback
          * @param successRc
          *            RC passed to final callback on success
@@ -730,4 +678,24 @@ public class MSLedgerManagerFactory extends LedgerManagerFactory {
         }
     }
 
+    @Override
+    public void format(AbstractConfiguration conf, ZooKeeper zk) throws InterruptedException,
+            KeeperException, IOException {
+        MetastoreTable ledgerTable;
+        try {
+            ledgerTable = metastore.createScannableTable(TABLE_NAME);
+        } catch (MetastoreException mse) {
+            throw new IOException("Failed to instantiate table " + TABLE_NAME + " in metastore "
+                    + metastore.getName());
+        }
+        try {
+            MetastoreUtils.cleanTable(ledgerTable, conf.getMetastoreMaxEntriesPerScan());
+        } catch (MSException mse) {
+            throw new IOException("Exception when cleanning up table " + TABLE_NAME, mse);
+        }
+        LOG.info("Finished cleaning up table {}.", TABLE_NAME);
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, zk);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 956595e..c37e084 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -451,7 +451,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
     private LedgerManager getLedgerManager(final Set<Long> ledgers) {
         LedgerManager manager = new LedgerManager() {
                 @Override
-                public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb) {
+                public void createLedgerMetadata(long lid, LedgerMetadata metadata, GenericCallback<Void> cb) {
                     unsupported();
                 }
                 @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index eb833a3..df74339 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -23,11 +23,13 @@ package org.apache.bookkeeper.client;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.meta.FlatLedgerManagerFactory;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -104,45 +106,54 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
 
     @Test(timeout = 60000)
     public void testWatchMetadataRemoval() throws Exception {
-       LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
-       factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
-       LedgerManager manager = factory.newLedgerManager();
-       final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
-       final CountDownLatch createLatch = new CountDownLatch(1);
-       final CountDownLatch removeLatch = new CountDownLatch(1);
-
-       manager.createLedger( new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
-                new BookkeeperInternalCallbacks.GenericCallback<Long>(){
-
-           @Override
-           public void operationComplete(int rc, Long result) {
-               bbLedgerId.putLong(result);
-               bbLedgerId.flip();
-               createLatch.countDown();
-           }
-       });
-       assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
-       final long createdLid = bbLedgerId.getLong();
-
-       manager.registerLedgerMetadataListener( createdLid,
-               new LedgerMetadataListener() {
-
-           @Override
-           public void onChanged( long ledgerId, LedgerMetadata metadata ) {
-               assertEquals(ledgerId, createdLid);
-               assertEquals(metadata, null);
-               removeLatch.countDown();
-           }
-       });
-
-       manager.removeLedgerMetadata( createdLid, Version.ANY,
-               new BookkeeperInternalCallbacks.GenericCallback<Void>() {
-
-           @Override
-           public void operationComplete(int rc, Void result) {
-               assertEquals(rc, BKException.Code.OK);
-           }
-       });
-       assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
+        LedgerManagerFactory factory = ReflectionUtils.newInstance(lmFactoryCls);
+        factory.initialize(baseConf, super.zkc, factory.getCurrentVersion());
+        final LedgerManager manager = factory.newLedgerManager();
+        LedgerIdGenerator idGenerator = factory.newLedgerIdGenerator();
+
+        final ByteBuffer bbLedgerId = ByteBuffer.allocate(8);
+        final CountDownLatch createLatch = new CountDownLatch(1);
+        final CountDownLatch removeLatch = new CountDownLatch(1);
+
+        idGenerator.generateLedgerId(new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, final Long lid) {
+                manager.createLedgerMetadata(lid, new LedgerMetadata(4, 2, 2, digestType, "fpj was here".getBytes()),
+                         new BookkeeperInternalCallbacks.GenericCallback<Void>(){
+
+                    @Override
+                    public void operationComplete(int rc, Void result) {
+                        bbLedgerId.putLong(lid);
+                        bbLedgerId.flip();
+                        createLatch.countDown();
+                    }
+                });
+
+            }
+        });
+
+        assertTrue(createLatch.await(2000, TimeUnit.MILLISECONDS));
+        final long createdLid = bbLedgerId.getLong();
+
+        manager.registerLedgerMetadataListener( createdLid,
+                new LedgerMetadataListener() {
+
+            @Override
+            public void onChanged( long ledgerId, LedgerMetadata metadata ) {
+                assertEquals(ledgerId, createdLid);
+                assertEquals(metadata, null);
+                removeLatch.countDown();
+            }
+        });
+
+        manager.removeLedgerMetadata( createdLid, Version.ANY,
+                new BookkeeperInternalCallbacks.GenericCallback<Void>() {
+
+            @Override
+            public void operationComplete(int rc, Void result) {
+                assertEquals(rc, BKException.Code.OK);
+            }
+        });
+        assertTrue(removeLatch.await(2000, TimeUnit.MILLISECONDS));
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 19aab44..de352b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,6 +21,7 @@
 
 package org.apache.bookkeeper.meta;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -63,23 +64,38 @@ public class GcLedgersTest extends LedgerManagerTestCase {
     /**
      * Create ledgers
      */
-    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
+    private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
         final AtomicInteger expected = new AtomicInteger(numLedgers);
         for (int i=0; i<numLedgers; i++) {
-            getLedgerManager().createLedger(new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()),
-                new GenericCallback<Long>() {
+            getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
                 @Override
-                public void operationComplete(int rc, Long ledgerId) {
-                    if (rc == BKException.Code.OK) {
-                        activeLedgers.put(ledgerId, true);
-                        createdLedgers.add(ledgerId);
-                    }
-                    synchronized (expected) {
-                        int num = expected.decrementAndGet();
-                        if (num == 0) {
-                            expected.notify();
+                public void operationComplete(int rc, final Long ledgerId) {
+                    if (BKException.Code.OK != rc) {
+                        synchronized (expected) {
+                            int num = expected.decrementAndGet();
+                            if (num == 0) {
+                                expected.notify();
+                            }
                         }
+                        return;
                     }
+
+                    getLedgerManager().createLedgerMetadata(ledgerId,
+                            new LedgerMetadata(1, 1, 1, DigestType.MAC, "".getBytes()), new GenericCallback<Void>() {
+                                @Override
+                                public void operationComplete(int rc, Void result) {
+                                    if (rc == BKException.Code.OK) {
+                                        activeLedgers.put(ledgerId, true);
+                                        createdLedgers.add(ledgerId);
+                                    }
+                                    synchronized (expected) {
+                                        int num = expected.decrementAndGet();
+                                        if (num == 0) {
+                                            expected.notify();
+                                        }
+                                    }
+                                }
+                            });
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/69828312/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index b95d2db..dae61bc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -21,20 +21,19 @@
 
 package org.apache.bookkeeper.meta;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.SnapshotMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test case to run over serveral ledger managers
@@ -45,6 +44,7 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
 
     LedgerManagerFactory ledgerManagerFactory;
     LedgerManager ledgerManager = null;
+    LedgerIdGenerator ledgerIdGenerator = null;
     SnapshotMap<Long, Boolean> activeLedgers = null;
 
     public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
@@ -60,6 +60,13 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         return ledgerManager;
     }
 
+    public LedgerIdGenerator getLedgerIdGenerator() throws IOException {
+        if (null == ledgerIdGenerator) {
+            ledgerIdGenerator = ledgerManagerFactory.newLedgerIdGenerator();
+        }
+        return ledgerIdGenerator;
+    }
+
     @Parameters
     public static Collection<Object[]> configs() {
         return Arrays.asList(new Object[][] {