You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/02 21:05:08 UTC

[2/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID Generation

BOOKKEEPER-552: 64 Bits Ledger ID Generation

This PR supersedes #112

Instead of moving LongHierarchicalLedgerManager to HierarchicalLedgerManager, LongHierarchicalLedgerManager is still a stand-alone manager. HierarchicalLedgerManager has been moved to LegacyHierarchicalLedgerManager, and a new HierarchicalLedgerManager class has been put in its place, which is backwards-compatible with the original (now legacy) HierarchicalLedgerManager.

This new HierarchicalLedgerManager leverages the new LongZkLedgerIdGenerator to generate Ids, and uses the LongHierarchicalLedgerManager to manage metadata for ledger IDs > 31 bits long. For shorter ledger ids, the LegacyHierarchicalLedgerManager is used, to keep backwards-compatibility.

Author: Kyle Nusbaum <kn...@yahoo-inc.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>

Closes #114 from knusbaum/BOOKKEEPER-552


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/057af8db
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/057af8db
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/057af8db

Branch: refs/heads/master
Commit: 057af8dbce6c08794eb8b46ca52ca13f222d9bbb
Parents: 9c79e07
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue May 2 14:04:57 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue May 2 14:04:57 2017 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BKException.java   |  12 +
 .../bookkeeper/conf/AbstractConfiguration.java  |   3 +
 .../meta/AbstractHierarchicalLedgerManager.java | 213 +++++++++++
 .../meta/AbstractZkLedgerManager.java           |   3 +-
 .../bookkeeper/meta/FlatLedgerManager.java      |   6 +-
 .../meta/HierarchicalLedgerManager.java         | 375 +++----------------
 .../meta/HierarchicalLedgerManagerFactory.java  |  80 +---
 .../bookkeeper/meta/LedgerManagerFactory.java   |   5 +-
 .../meta/LegacyHierarchicalLedgerManager.java   | 281 ++++++++++++++
 .../LegacyHierarchicalLedgerManagerFactory.java | 100 +++++
 .../meta/LongHierarchicalLedgerManager.java     | 101 +++--
 .../meta/LongZkLedgerIdGenerator.java           | 333 ++++++++++++++++
 .../bookkeeper/meta/ZkLedgerIdGenerator.java    |  27 +-
 .../org/apache/bookkeeper/util/StringUtils.java |  11 +-
 .../bookkeeper/client/BookieRecoveryTest.java   |   6 +-
 .../meta/TestLongZkLedgerIdGenerator.java       | 145 +++++++
 .../MultiLedgerManagerMultiDigestTestCase.java  |   1 +
 .../test/MultiLedgerManagerTestCase.java        |   1 +
 .../bookkeeper/test/TestBackwardCompat.java     |  53 +++
 19 files changed, 1310 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 2377c1c..aa3ec08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -102,6 +102,8 @@ public abstract class BKException extends Exception {
             return new BKDuplicateEntryIdException();
         case Code.TimeoutException:
             return new BKTimeoutException();
+        case Code.LedgerIdOverflowException:
+            return new BKLedgerIdOverflowException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -142,6 +144,8 @@ public abstract class BKException extends Exception {
         int UnauthorizedAccessException = -102;
         int UnclosedFragmentException = -103;
         int WriteOnReadOnlyBookieException = -104;
+        //-105 reserved for TooManyRequestsException
+        int LedgerIdOverflowException = -106;
 
         // generic exception code used to propagate in replication pipeline
         int ReplicationException = -200;
@@ -210,6 +214,8 @@ public abstract class BKException extends Exception {
             return "Attempting to use an unclosed fragment; This is not safe";
         case Code.WriteOnReadOnlyBookieException:
             return "Attempting to write on ReadOnly bookie";
+        case Code.LedgerIdOverflowException:
+            return "Next ledgerID is too large.";
         case Code.ReplicationException:
             return "Errors in replication pipeline";
         case Code.ClientClosedException:
@@ -404,4 +410,10 @@ public abstract class BKException extends Exception {
             super(Code.TimeoutException);
         }
     }
+
+    public static class BKLedgerIdOverflowException extends BKException {
+        public BKLedgerIdOverflowException() {
+            super(Code.LedgerIdOverflowException);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 07e5d08..c7c50cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -76,6 +76,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
     // Zookeeper ACL settings
     protected final static String ZK_ENABLE_SECURITY = "zkEnableSecurity";
 
+    // Kluge for compatibility testing. Never set this outside tests.
+    public final static String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
new file mode 100644
index 0000000..02359e0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHierarchicalLedgerManager extends AbstractZkLedgerManager {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractHierarchicalLedgerManager.class);
+
+    /**
+     * Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     */
+    public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
+    }
+    
+    /**
+     * Process hash nodes in a given path
+     */
+    void asyncProcessLevelNodes(
+        final String path, final Processor<String> processor,
+        final AsyncCallback.VoidCallback finalCb, final Object context,
+        final int successRc, final int failureRc) {
+        zk.sync(path, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("Error syncing path " + path + " when getting its chidren: ",
+                              KeeperException.create(KeeperException.Code.get(rc), path));
+                    finalCb.processResult(failureRc, null, context);
+                    return;
+                }
+
+                zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx,
+                                              List<String> levelNodes) {
+                        if (rc != Code.OK.intValue()) {
+                            LOG.error("Error polling hash nodes of " + path,
+                                      KeeperException.create(KeeperException.Code.get(rc), path));
+                            finalCb.processResult(failureRc, null, context);
+                            return;
+                        }
+                        AsyncListProcessor<String> listProcessor =
+                                new AsyncListProcessor<String>(scheduler);
+                        // process its children
+                        listProcessor.process(levelNodes, processor, finalCb,
+                                              context, successRc, failureRc);
+                    }
+                }, null);
+            }
+        }, null);
+    }
+    
+    /**
+     * Process list one by one in asynchronize way. Process will be stopped immediately
+     * when error occurred.
+     */
+    private static class AsyncListProcessor<T> {
+        // use this to prevent long stack chains from building up in callbacks
+        ScheduledExecutorService scheduler;
+
+        /**
+         * Constructor
+         *
+         * @param scheduler
+         *          Executor used to prevent long stack chains
+         */
+        public AsyncListProcessor(ScheduledExecutorService scheduler) {
+            this.scheduler = scheduler;
+        }
+
+        /**
+         * Process list of items
+         *
+         * @param data
+         *          List of data to process
+         * @param processor
+         *          Callback to process element of list when success
+         * @param finalCb
+         *          Final callback to be called after all elements in the list are processed
+         * @param context
+         *          Context of final callback
+         * @param successRc
+         *          RC passed to final callback on success
+         * @param failureRc
+         *          RC passed to final callback on failure
+         */
+        public void process(final List<T> data, final Processor<T> processor,
+                            final AsyncCallback.VoidCallback finalCb, final Object context,
+                            final int successRc, final int failureRc) {
+            if (data == null || data.size() == 0) {
+                finalCb.processResult(successRc, null, context);
+                return;
+            }
+            final int size = data.size();
+            final AtomicInteger current = new AtomicInteger(0);
+            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (rc != successRc) {
+                        // terminal immediately
+                        finalCb.processResult(failureRc, null, context);
+                        return;
+                    }
+                    // process next element
+                    int next = current.incrementAndGet();
+                    if (next >= size) { // reach the end of list
+                        finalCb.processResult(successRc, null, context);
+                        return;
+                    }
+                    final T dataToProcess = data.get(next);
+                    final AsyncCallback.VoidCallback stub = this;
+                    scheduler.submit(new Runnable() {
+                        @Override
+                        public final void run() {
+                            processor.process(dataToProcess, stub);
+                        }
+                    });
+                }
+            };
+            T firstElement = data.get(0);
+            processor.process(firstElement, stubCallback);
+        }
+    }
+    
+    // get ledger from all level nodes
+    long getLedgerId(String...levelNodes) throws IOException {
+        return StringUtils.stringToHierarchicalLedgerId(levelNodes);
+    }
+    
+    /**
+     * Get all ledger ids in the given zk path.
+     *
+     * @param ledgerNodes
+     *          List of ledgers in the given path
+     *          example:- {L1652, L1653, L1650}
+     * @param path
+     *          The zookeeper path of the ledger ids. The path should start with {@ledgerRootPath}
+     *          example (with ledgerRootPath = /ledgers):- /ledgers/00/0053
+     */
+    @Override
+    protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+        NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>();
+
+        if (!path.startsWith(ledgerRootPath)) {
+            LOG.warn("Ledger path [{}] is not a valid path name, it should start wth {}", path, ledgerRootPath);
+            return zkActiveLedgers;
+        }
+
+        long ledgerIdPrefix = 0;
+        char ch;
+        for (int i = ledgerRootPath.length() + 1; i < path.length(); i++) {
+            ch = path.charAt(i);
+            if (ch < '0' || ch > '9')
+                continue;
+            ledgerIdPrefix = ledgerIdPrefix * 10 + (ch - '0');
+        }
+
+        for (String ledgerNode : ledgerNodes) {
+            if (isSpecialZnode(ledgerNode)) {
+                continue;
+            }
+            long ledgerId = ledgerIdPrefix;
+            for (int i = 0; i < ledgerNode.length(); i++) {
+                ch = ledgerNode.charAt(i);
+                if (ch < '0' || ch > '9')
+                    continue;
+                ledgerId = ledgerId * 10 + (ch - '0');
+            }
+            zkActiveLedgers.add(ledgerId);
+        }
+        return zkActiveLedgers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 6db3375..f5a60f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
@@ -468,7 +469,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
      *          Znode Name
      * @return true  if the znode is a special znode otherwise false
      */
-    protected boolean isSpecialZnode(String znode) {
+     protected static boolean isSpecialZnode(String znode) {
         if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
                 || BookKeeperConstants.COOKIE_NODE.equals(znode)
                 || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 3172247..36db62a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -87,9 +87,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
         asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
     }
 
-    @Override
-    protected boolean isSpecialZnode(String znode) {
-        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+    
+    protected static boolean isSpecialZnode(String znode) {
+        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || AbstractZkLedgerManager.isSpecialZnode(znode);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index bed1627..309762b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -18,370 +18,105 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
- * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ * HierarchicalLedgerManager makes use of both LongHierarchicalLedgerManager and LegacyHierarchicalLedgerManager
+ * to extend the 31-bit ledger id range of the LegacyHierarchicalLedgerManager to that of the LongHierarchicalLedgerManager
+ * while remaining backwards-compatible with the legacy manager.
+ * 
+ * In order to achieve backwards-compatibility, the HierarchicalLedgerManager forwards requests relating to ledger IDs which 
+ * are < Integer.MAX_INT to the LegacyHierarchicalLedgerManager. The new 5-part directory structure will not appear until a 
+ * ledger with an ID >= Integer.MAX_INT is created.
  *
- * <p>
- * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
- * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
- * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
- * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
- * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
- * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * errors during garbage collection due to lists of children that are too long.
+ * @see LongHierarchicalLedgerManager
+ * @see LegacyHierarchicalLedgerManager
  */
-class HierarchicalLedgerManager extends AbstractZkLedgerManager {
-
+class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
     static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
 
-    static final String IDGEN_ZNODE = "idgen";
-    private static final String MAX_ID_SUFFIX = "9999";
-    private static final String MIN_ID_SUFFIX = "0000";
+    LegacyHierarchicalLedgerManager legacyLM;
+    LongHierarchicalLedgerManager longLM;
 
-    /**
-     * Constructor
-     *
-     * @param conf
-     *          Configuration object
-     * @param zk
-     *          ZooKeeper Client Handle
-     */
     public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         super(conf, zk);
+        legacyLM = new LegacyHierarchicalLedgerManager(conf, zk);
+        longLM = new LongHierarchicalLedgerManager (conf, zk);
     }
 
     @Override
-    public String getLedgerPath(long ledgerId) {
-        return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
-    }
-
-    @Override
-    public long getLedgerId(String pathName) throws IOException {
-        if (!pathName.startsWith(ledgerRootPath)) {
-            throw new IOException("it is not a valid hashed path name : " + pathName);
-        }
-        String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
-        return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
-    }
-
-    // get ledger from all level nodes
-    long getLedgerId(String...levelNodes) throws IOException {
-        return StringUtils.stringToHierarchicalLedgerId(levelNodes);
-    }
-
-    //
-    // Active Ledger Manager
-    //
-
-    /**
-     * Get the smallest cache id in a specified node /level1/level2
-     *
-     * @param level1
-     *          1st level node name
-     * @param level2
-     *          2nd level node name
-     * @return the smallest ledger id
-     */
-    private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
-        return getLedgerId(level1, level2, MIN_ID_SUFFIX);
-    }
-
-    /**
-     * Get the largest cache id in a specified node /level1/level2
-     *
-     * @param level1
-     *          1st level node name
-     * @param level2
-     *          2nd level node name
-     * @return the largest ledger id
-     */
-    private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
-        return getLedgerId(level1, level2, MAX_ID_SUFFIX);
-    }
-
-    @Override
-    public void asyncProcessLedgers(final Processor<Long> processor,
-                                    final AsyncCallback.VoidCallback finalCb, final Object context,
-                                    final int successRc, final int failureRc) {
-        // process 1st level nodes
-        asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
-            @Override
-            public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
-                if (isSpecialZnode(l1Node)) {
-                    cb1.processResult(successRc, null, context);
-                    return;
-                }
-                final String l1NodePath = ledgerRootPath + "/" + l1Node;
-                // process level1 path, after all children of level1 process
-                // it callback to continue processing next level1 node
-                asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
-                    @Override
-                    public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
-                        // process level1/level2 path
-                        String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
-                        // process each ledger
-                        // after all ledger are processed, cb2 will be call to continue processing next level2 node
-                        asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
-                                                        context, successRc, failureRc);
-                    }
-                }, cb1, context, successRc, failureRc);
-            }
-        }, finalCb, context, successRc, failureRc);
-    }
+    public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc,
+            int failureRc) {
+        // Process the old 31-bit id ledgers first.
+        legacyLM.asyncProcessLedgers(processor, new VoidCallback(){
 
-    /**
-     * Process hash nodes in a given path
-     */
-    void asyncProcessLevelNodes(
-        final String path, final Processor<String> processor,
-        final AsyncCallback.VoidCallback finalCb, final Object context,
-        final int successRc, final int failureRc) {
-        zk.sync(path, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("Error syncing path " + path + " when getting its chidren: ",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    finalCb.processResult(failureRc, null, context);
-                    return;
+                if(rc == failureRc) {
+                    // If it fails, return the failure code to the callback
+                    finalCb.processResult(rc, path, ctx);
+                }
+                else {
+                    // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
+                    longLM.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
                 }
-
-                zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
-                    @Override
-                    public void processResult(int rc, String path, Object ctx,
-                                              List<String> levelNodes) {
-                        if (rc != Code.OK.intValue()) {
-                            LOG.error("Error polling hash nodes of " + path,
-                                      KeeperException.create(KeeperException.Code.get(rc), path));
-                            finalCb.processResult(failureRc, null, context);
-                            return;
-                        }
-                        AsyncListProcessor<String> listProcessor =
-                                new AsyncListProcessor<String>(scheduler);
-                        // process its children
-                        listProcessor.process(levelNodes, processor, finalCb,
-                                              context, successRc, failureRc);
-                    }
-                }, null);
             }
-        }, null);
-    }
-
-    /**
-     * Process list one by one in asynchronize way. Process will be stopped immediately
-     * when error occurred.
-     */
-    private static class AsyncListProcessor<T> {
-        // use this to prevent long stack chains from building up in callbacks
-        ScheduledExecutorService scheduler;
 
-        /**
-         * Constructor
-         *
-         * @param scheduler
-         *          Executor used to prevent long stack chains
-         */
-        public AsyncListProcessor(ScheduledExecutorService scheduler) {
-            this.scheduler = scheduler;
-        }
+        }, context, successRc, failureRc);
+    }
 
-        /**
-         * Process list of items
-         *
-         * @param data
-         *          List of data to process
-         * @param processor
-         *          Callback to process element of list when success
-         * @param finalCb
-         *          Final callback to be called after all elements in the list are processed
-         * @param context
-         *          Context of final callback
-         * @param successRc
-         *          RC passed to final callback on success
-         * @param failureRc
-         *          RC passed to final callback on failure
-         */
-        public void process(final List<T> data, final Processor<T> processor,
-                            final AsyncCallback.VoidCallback finalCb, final Object context,
-                            final int successRc, final int failureRc) {
-            if (data == null || data.size() == 0) {
-                finalCb.processResult(successRc, null, context);
-                return;
-            }
-            final int size = data.size();
-            final AtomicInteger current = new AtomicInteger(0);
-            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (rc != successRc) {
-                        // terminal immediately
-                        finalCb.processResult(failureRc, null, context);
-                        return;
-                    }
-                    // process next element
-                    int next = current.incrementAndGet();
-                    if (next >= size) { // reach the end of list
-                        finalCb.processResult(successRc, null, context);
-                        return;
-                    }
-                    final T dataToProcess = data.get(next);
-                    final AsyncCallback.VoidCallback stub = this;
-                    scheduler.submit(new Runnable() {
-                        @Override
-                        public final void run() {
-                            processor.process(dataToProcess, stub);
-                        }
-                    });
-                }
-            };
-            T firstElement = data.get(0);
-            processor.process(firstElement, stubCallback);
-        }
+    @Override
+    protected String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
     }
 
     @Override
-    protected boolean isSpecialZnode(String znode) {
-        return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
+    protected long getLedgerId(String ledgerPath) throws IOException {
+        // TODO Auto-generated method stub
+        if (!ledgerPath.startsWith(ledgerRootPath)) {
+            throw new IOException("it is not a valid hashed path name : " + ledgerPath);
+        }
+        String hierarchicalPath = ledgerPath.substring(ledgerRootPath.length() + 1);
+        return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
     }
 
     @Override
     public LedgerRangeIterator getLedgerRanges() {
-        return new HierarchicalLedgerRangeIterator();
+        LedgerRangeIterator legacyLedgerRangeIterator = legacyLM.getLedgerRanges();
+        LedgerRangeIterator longLedgerRangeIterator = longLM.getLedgerRanges();
+        return new HierarchicalLedgerRangeIterator(legacyLedgerRangeIterator, longLedgerRangeIterator);
     }
 
-    /**
-     * Iterator through each metadata bucket with hierarchical mode
-     */
-    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
-        private Iterator<String> l1NodesIter = null;
-        private Iterator<String> l2NodesIter = null;
-        private String curL1Nodes = "";
-        private boolean iteratorDone = false;
-        private LedgerRange nextRange = null;
+    private static class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
 
-        /**
-         * iterate next level1 znode
-         *
-         * @return false if have visited all level1 nodes
-         * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
-         */
-        private boolean nextL1Node() throws KeeperException, InterruptedException {
-            l2NodesIter = null;
-            while (l2NodesIter == null) {
-                if (l1NodesIter.hasNext()) {
-                    curL1Nodes = l1NodesIter.next();
-                } else {
-                    return false;
-                }
-                if (isSpecialZnode(curL1Nodes)) {
-                    continue;
-                }
-                List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
-                Collections.sort(l2Nodes);
-                l2NodesIter = l2Nodes.iterator();
-                if (!l2NodesIter.hasNext()) {
-                    l2NodesIter = null;
-                    continue;
-                }
-            }
-            return true;
-        }
+        LedgerRangeIterator legacyLedgerRangeIterator;
+        LedgerRangeIterator longLedgerRangeIterator;
 
-        synchronized private void preload() throws IOException {
-            while (nextRange == null && !iteratorDone) {
-                boolean hasMoreElements = false;
-                try {
-                    if (l1NodesIter == null) {
-                        List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
-                        Collections.sort(l1Nodes);
-                        l1NodesIter = l1Nodes.iterator();
-                        hasMoreElements = nextL1Node();
-                    } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
-                        hasMoreElements = nextL1Node();
-                    } else {
-                        hasMoreElements = true;
-                    }
-                } catch (KeeperException ke) {
-                    throw new IOException("Error preloading next range", ke);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new IOException("Interrupted while preloading", ie);
-                }
-                if (hasMoreElements) {
-                    nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
-                    if (nextRange.size() == 0) {
-                        nextRange = null;
-                    }
-                } else {
-                    iteratorDone = true;
-                }
-            }
+        HierarchicalLedgerRangeIterator(LedgerRangeIterator legacyLedgerRangeIterator, LedgerRangeIterator longLedgerRangeIterator) {
+            this.legacyLedgerRangeIterator = legacyLedgerRangeIterator;
+            this.longLedgerRangeIterator = longLedgerRangeIterator;
         }
 
         @Override
-        synchronized public boolean hasNext() throws IOException {
-            preload();
-            return nextRange != null && !iteratorDone;
+        public boolean hasNext() throws IOException {
+            return legacyLedgerRangeIterator.hasNext() || longLedgerRangeIterator.hasNext();
         }
 
         @Override
-        synchronized public LedgerRange next() throws IOException {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
+        public LedgerRange next() throws IOException {
+            if(legacyLedgerRangeIterator.hasNext()) {
+                return legacyLedgerRangeIterator.next();
             }
-            LedgerRange r = nextRange;
-            nextRange = null;
-            return r;
+            return longLedgerRangeIterator.next();
         }
 
-        /**
-         * Get a single node level1/level2
-         *
-         * @param level1
-         *          1st level node name
-         * @param level2
-         *          2nd level node name
-         * @throws IOException
-         */
-        LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
-                throws IOException {
-            StringBuilder nodeBuilder = new StringBuilder();
-            nodeBuilder.append(ledgerRootPath).append("/")
-                       .append(level1).append("/").append(level2);
-            String nodePath = nodeBuilder.toString();
-            List<String> ledgerNodes = null;
-            try {
-                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
-            } catch (InterruptedException e) {
-                throw new IOException("Error when get child nodes from zk", e);
-            }
-            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node "
-                          + level1 + "/" + level2 + " : " + zkActiveLedgers);
-            }
-
-            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
-                                                          getEndLedgerIdByLevel(level1, level2), true));
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index 084d73d..a74a633 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,89 +15,25 @@ package org.apache.bookkeeper.meta;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.meta;
 
-import java.io.IOException;
 import java.util.List;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.bookkeeper.replication.ReplicationException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
- * Hierarchical Ledger Manager Factory
+ * Legacy Hierarchical Ledger Manager Factory
  */
-public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+public class HierarchicalLedgerManagerFactory extends LegacyHierarchicalLedgerManagerFactory {
 
     public static final String NAME = "hierarchical";
-    public static final int CUR_VERSION = 1;
-
-    AbstractConfiguration conf;
-    ZooKeeper zk;
-
-    @Override
-    public int getCurrentVersion() {
-        return CUR_VERSION;
-    }
-
-    @Override
-    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
-                                           final ZooKeeper zk,
-                                           final int factoryVersion)
-    throws IOException {
-        if (CUR_VERSION != factoryVersion) {
-            throw new IOException("Incompatible layout version found : "
-                                + factoryVersion);
-        }
-        this.conf = conf;
-        this.zk = zk;
-        return this;
-    }
-
-    @Override
-    public void uninitialize() throws IOException {
-        // since zookeeper instance is passed from outside
-        // we don't need to close it here
-    }
-
+    
     @Override
     public LedgerIdGenerator newLedgerIdGenerator() {
         List<ACL> zkAcls = ZkUtils.getACLs(conf);
-        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
-    }
-
-    @Override
-    public LedgerManager newLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
-    }
-
-    @Override
-    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
-        return new ZkLedgerUnderreplicationManager(conf, zk);
-    }
-
-    @Override
-    public void format(AbstractConfiguration conf, ZooKeeper zk)
-            throws InterruptedException, KeeperException, IOException {
-        HierarchicalLedgerManager ledgerManager = (HierarchicalLedgerManager) newLedgerManager();
-        try {
-            String ledgersRootPath = conf.getZkLedgersRootPath();
-            List<String> children = zk.getChildren(ledgersRootPath, false);
-            for (String child : children) {
-                if (ledgerManager.isSpecialZnode(child)) {
-                    continue;
-                }
-                ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-            }
-        } finally {
-            ledgerManager.close();
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, zk);
+        ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+        return new LongZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LongHierarchicalLedgerManager.IDGEN_ZNODE, subIdGenerator, zkAcls);
     }
-
+    
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 76d1572..e7cfc4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -164,7 +164,8 @@ public abstract class LedgerManagerFactory {
 
         // handle V2 layout case
         if (factoryClass != null &&
-            !layout.getManagerFactoryClass().equals(factoryClass.getName())) {
+            !layout.getManagerFactoryClass().equals(factoryClass.getName()) &&
+            conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) { // Disable should ONLY happen during compatibility testing.
 
             throw new IOException("Configured layout " + factoryClass.getName()
                                 + " does not match existing layout "  + layout.getManagerFactoryClass());
@@ -210,6 +211,8 @@ public abstract class LedgerManagerFactory {
                     factoryClass = FlatLedgerManagerFactory.class;
                 } else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
                     factoryClass = HierarchicalLedgerManagerFactory.class;
+                } else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
+                    factoryClass = LongHierarchicalLedgerManagerFactory.class;
                 } else {
                     throw new IOException("Unknown ledger manager type: "
                             + lmType);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
new file mode 100644
index 0000000..8be23b2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ *
+ * <p>
+ * LegacyHierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
+ * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
+ * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
+ * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
+ * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
+ * errors during garbage collection due to lists of children that are too long.
+ */
+class LegacyHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
+
+    static final Logger LOG = LoggerFactory.getLogger(LegacyHierarchicalLedgerManager.class);
+
+    static final String IDGEN_ZNODE = "idgen";
+    private static final String MAX_ID_SUFFIX = "9999";
+    private static final String MIN_ID_SUFFIX = "0000";
+
+    private static final ThreadLocal<StringBuilder> threadLocalNodeBuilder = new ThreadLocal<StringBuilder>() {
+        @Override
+        protected StringBuilder initialValue() {
+            return new StringBuilder();
+        }
+    };
+    
+    /**
+     * Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     */
+    public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
+    }
+
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getShortHierarchicalLedgerPath(ledgerId);
+    }
+
+    @Override
+    public long getLedgerId(String pathName) throws IOException {
+        if (!pathName.startsWith(ledgerRootPath)) {
+            throw new IOException("it is not a valid hashed path name : " + pathName);
+        }
+        String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+        return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
+    }
+
+    //
+    // Active Ledger Manager
+    //
+
+    /**
+     * Get the smallest cache id in a specified node /level1/level2
+     *
+     * @param level1
+     *          1st level node name
+     * @param level2
+     *          2nd level node name
+     * @return the smallest ledger id
+     */
+    private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
+        return getLedgerId(level1, level2, MIN_ID_SUFFIX);
+    }
+
+    /**
+     * Get the largest cache id in a specified node /level1/level2
+     *
+     * @param level1
+     *          1st level node name
+     * @param level2
+     *          2nd level node name
+     * @return the largest ledger id
+     */
+    private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
+        return getLedgerId(level1, level2, MAX_ID_SUFFIX);
+    }
+
+    @Override
+    public void asyncProcessLedgers(final Processor<Long> processor,
+                                    final AsyncCallback.VoidCallback finalCb, final Object context,
+                                    final int successRc, final int failureRc) {
+        // process 1st level nodes
+        asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
+            @Override
+            public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
+                if (isSpecialZnode(l1Node)) {
+                    cb1.processResult(successRc, null, context);
+                    return;
+                }
+                final String l1NodePath = ledgerRootPath + "/" + l1Node;
+                // process level1 path, after all children of level1 process
+                // it callback to continue processing next level1 node
+                asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
+                    @Override
+                    public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
+                        // process level1/level2 path
+                        String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
+                        // process each ledger
+                        // after all ledger are processed, cb2 will be call to continue processing next level2 node
+                        asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
+                                                        context, successRc, failureRc);
+                    }
+                }, cb1, context, successRc, failureRc);
+            }
+        }, finalCb, context, successRc, failureRc);
+    }
+
+    protected static boolean isSpecialZnode(String znode) {
+        return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode) || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+    }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        return new HierarchicalLedgerRangeIterator();
+    }
+
+    /**
+     * Iterator through each metadata bucket with hierarchical mode
+     */
+    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+        private Iterator<String> l1NodesIter = null;
+        private Iterator<String> l2NodesIter = null;
+        private String curL1Nodes = "";
+        private boolean iteratorDone = false;
+        private LedgerRange nextRange = null;
+
+        /**
+         * iterate next level1 znode
+         *
+         * @return false if have visited all level1 nodes
+         * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+         */
+        private boolean nextL1Node() throws KeeperException, InterruptedException {
+            l2NodesIter = null;
+            while (l2NodesIter == null) {
+                if (l1NodesIter.hasNext()) {
+                    curL1Nodes = l1NodesIter.next();
+                } else {
+                    return false;
+                }
+                // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes)
+                if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+                    continue;
+                }
+                List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+                Collections.sort(l2Nodes);
+                l2NodesIter = l2Nodes.iterator();
+                if (!l2NodesIter.hasNext()) {
+                    l2NodesIter = null;
+                    continue;
+                }
+            }
+            return true;
+        }
+
+        synchronized private void preload() throws IOException {
+            while (nextRange == null && !iteratorDone) {
+                boolean hasMoreElements = false;
+                try {
+                    if (l1NodesIter == null) {
+                        List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
+                        Collections.sort(l1Nodes);
+                        l1NodesIter = l1Nodes.iterator();
+                        hasMoreElements = nextL1Node();
+                    } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
+                        hasMoreElements = nextL1Node();
+                    } else {
+                        hasMoreElements = true;
+                    }
+                } catch (KeeperException ke) {
+                    throw new IOException("Error preloading next range", ke);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Interrupted while preloading", ie);
+                }
+                if (hasMoreElements) {
+                    nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+                    if (nextRange.size() == 0) {
+                        nextRange = null;
+                    }
+                } else {
+                    iteratorDone = true;
+                }
+            }
+        }
+
+        @Override
+        synchronized public boolean hasNext() throws IOException {
+            preload();
+            return nextRange != null && !iteratorDone;
+        }
+
+        @Override
+        synchronized public LedgerRange next() throws IOException {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            LedgerRange r = nextRange;
+            nextRange = null;
+            return r;
+        }
+
+        /**
+         * Get a single node level1/level2
+         *
+         * @param level1
+         *          1st level node name
+         * @param level2
+         *          2nd level node name
+         * @throws IOException
+         */
+        LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+                throws IOException {
+            StringBuilder nodeBuilder = threadLocalNodeBuilder.get();
+            nodeBuilder.setLength(0);
+            nodeBuilder.append(ledgerRootPath).append("/")
+                       .append(level1).append("/").append(level2);
+            String nodePath = nodeBuilder.toString();
+            List<String> ledgerNodes = null;
+            try {
+                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (InterruptedException e) {
+                throw new IOException("Error when get child nodes from zk", e);
+            }
+            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("All active ledgers from ZK for hash node "
+                          + level1 + "/" + level2 + " : " + zkActiveLedgers);
+            }
+
+            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
+                                                          getEndLedgerIdByLevel(level1, level2), true));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..1ac4038
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,100 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+    public static final String NAME = "legacyhierarchical";
+    public static final int CUR_VERSION = 1;
+
+    AbstractConfiguration conf;
+    ZooKeeper zk;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+                                           final ZooKeeper zk,
+                                           final int factoryVersion)
+    throws IOException {
+        if (CUR_VERSION != factoryVersion) {
+            throw new IOException("Incompatible layout version found : "
+                                + factoryVersion);
+        }
+        this.conf = conf;
+        this.zk = zk;
+        return this;
+    }
+
+    @Override
+    public void uninitialize() throws IOException {
+        // since zookeeper instance is passed from outside
+        // we don't need to close it here
+    }
+
+    @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
+
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+        return new ZkLedgerUnderreplicationManager(conf, zk);
+    }
+
+    @Override
+    public void format(AbstractConfiguration conf, ZooKeeper zk)
+            throws InterruptedException, KeeperException, IOException {
+        String ledgersRootPath = conf.getZkLedgersRootPath();
+        List<String> children = zk.getChildren(ledgersRootPath, false);
+        for (String child : children) {
+            if (HierarchicalLedgerManager.isSpecialZnode(child)) {
+                continue;
+            }
+            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+        }
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, zk);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index 990297f..f8f7546 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.TreeSet;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -37,33 +38,36 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes.
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 5-level hierarchical znodes.
  *
  * <p>
  * LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
  *
  * <pre>
- * &lt;level1 (3 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;&lt;level4 (4 digits)&gt;
- * &lt;level5 (4 digits)&gt;
+ * &lt;level0 (3 digits)&gt;&lt;level1 (4 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;
+ * &lt;level4 (4 digits)&gt;
  * </pre>
  *
  * These 5 parts are used to form the actual ledger node path used to store ledger metadata:
  *
  * <pre>
- * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * (ledgersRootPath) / level0 / level1 / level2 / level3 / L(level4)
  * </pre>
  *
  * E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>,
  * which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers,
  * which avoids errors during garbage collection due to lists of children that are too long.
+ *
  */
-class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+class LongHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
 
     static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
 
+    static final String IDGEN_ZNODE = "idgen-long";
     private static final String MAX_ID_SUFFIX = "9999";
     private static final String MIN_ID_SUFFIX = "0000";
 
+
     /**
      * Constructor
      *
@@ -77,11 +81,6 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
     }
 
     @Override
-    public String getLedgerPath(long ledgerId) {
-        return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
-    }
-
-    @Override
     public long getLedgerId(String pathName) throws IOException {
         if (!pathName.startsWith(ledgerRootPath)) {
             throw new IOException("it is not a valid hashed path name : " + pathName);
@@ -89,54 +88,68 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
         String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
         return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
     }
+    
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+    }
 
     //
     // Active Ledger Manager
     //
 
     /**
-     * Get the smallest cache id in a specified node /level1/level2/level3/level4
+     * Get the smallest cache id in a specified node /level0/level1/level2/level3
      *
-     * @param level1
+     * @param level0
      *            1st level node name
-     * @param level2
+     * @param level1
      *            2nd level node name
-     * @param level3
+     * @param level2
      *            3rd level node name
-     * @param level4
+     * @param level3
      *            4th level node name
      * @return the smallest ledger id
      */
-    private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4)
+    private long getStartLedgerIdByLevel(String level0, String level1, String level2, String level3)
             throws IOException {
-        return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+        return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
     }
 
     /**
-     * Get the largest cache id in a specified node /level1/level2/level3/level4
+     * Get the largest cache id in a specified node /level0/level1/level2/level3
      *
-     * @param level1
+     * @param level0
      *            1st level node name
-     * @param level2
+     * @param level1
      *            2nd level node name
-     * @param level3
+     * @param level2
      *            3rd level node name
-     * @param level4
+     * @param level3
      *            4th level node name
      * @return the largest ledger id
      */
-    private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException {
-        return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+    private long getEndLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException {
+        return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
     }
 
     @Override
     public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
             final Object context, final int successRc, final int failureRc) {
+
+        // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
         asyncProcessLevelNodes(ledgerRootPath,
                 new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context,
                 successRc, failureRc);
     }
 
+    protected static boolean isSpecialZnode(String znode) {
+        // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4)
+        // are at least 3 characters long. This prevents picking up any old-style
+        // hierarchical paths (2-4-4)
+        return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3;
+    }
+    
     private class RecursiveProcessor implements Processor<String> {
         private final int level;
         private final String path;
@@ -167,7 +180,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         context, successRc, failureRc);
             } else {
                 // process each ledger after all ledger are processed, cb will be call to continue processing next
-                // level5 node
+                // level4 node
                 asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc);
             }
         }
@@ -194,7 +207,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
         }
 
-        private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
+        synchronized private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
             List<String> levelNodes = zk.getChildren(path, null);
             Collections.sort(levelNodes);
             if (level == 0) {
@@ -217,6 +230,9 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
             String curLNode = curLevelNodes.get(level);
             if (curLNode != null) {
+                // Traverse down through levels 0-3
+                // The nextRange becomes a listing of the children
+                // in the level4 directory.
                 if (level != 3) {
                     String nextLevelPath = path + "/" + curLNode;
                     initialize(nextLevelPath, level + 1);
@@ -229,7 +245,13 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
         }
 
-        private boolean moveToNext(int level) throws KeeperException, InterruptedException {
+        private void clearHigherLevels(int level) {
+            for(int i = level+1; i < 4; i++) {
+                curLevelNodes.set(i, null);
+            }
+        }
+
+        synchronized private boolean moveToNext(int level) throws KeeperException, InterruptedException {
             Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
             boolean movedToNextNode = false;
             if (level == 0) {
@@ -239,6 +261,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         continue;
                     } else {
                         curLevelNodes.set(level, nextNode);
+                        clearHigherLevels(level);
                         movedToNextNode = true;
                         break;
                     }
@@ -247,6 +270,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                 if (curLevelNodesIter.hasNext()) {
                     String nextNode = curLevelNodesIter.next();
                     curLevelNodes.set(level, nextNode);
+                    clearHigherLevels(level);
                     movedToNextNode = true;
                 } else {
                     movedToNextNode = moveToNext(level - 1);
@@ -261,6 +285,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         levelNodesIter.set(level, newCurLevelNodesIter);
                         if (newCurLevelNodesIter.hasNext()) {
                             curLevelNodes.set(level, newCurLevelNodesIter.next());
+                            clearHigherLevels(level);
                             movedToNextNode = true;
                         }
                     }
@@ -306,15 +331,15 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             return r;
         }
 
-        LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
-            String level1 = curLevelNodes.get(0);
-            String level2 = curLevelNodes.get(1);
-            String level3 = curLevelNodes.get(2);
-            String level4 = curLevelNodes.get(3);
+        private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
+            String level0 = curLevelNodes.get(0);
+            String level1 = curLevelNodes.get(1);
+            String level2 = curLevelNodes.get(2);
+            String level3 = curLevelNodes.get(3);
 
             StringBuilder nodeBuilder = new StringBuilder();
-            nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
-                    .append(level3).append("/").append(level4);
+            nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
+                    .append(level2).append("/").append(level3);
             String nodePath = nodeBuilder.toString();
             List<String> ledgerNodes = null;
             try {
@@ -324,11 +349,11 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
             NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/"
-                        + level4 + " : " + zkActiveLedgers);
+                LOG.debug("All active ledgers from ZK for hash node " + level0 + "/" + level1 + "/" + level2 + "/"
+                        + level3 + " : " + zkActiveLedgers);
             }
-            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true,
-                    getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, level2, level3), true,
+                    getEndLedgerIdByLevel(level0, level1, level2, level3), true));
         }
     }
 }