You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2017/05/02 21:05:07 UTC

[1/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID Generation

Repository: bookkeeper
Updated Branches:
  refs/heads/master 9c79e078b -> 057af8dbc


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
new file mode 100644
index 0000000..393f9b1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongZkLedgerIdGenerator.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Formatter;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper based ledger id generator class, which using EPHEMERAL_SEQUENTIAL
+ * with <i>(ledgerIdGenPath)/HOB-[high-32-bits]/ID-</i> prefix to generate ledger id. Note
+ * zookeeper sequential counter has a format of %10d -- that is 10 digits with 0
+ * (zero) padding, i.e. "&lt;path&gt;0000000001", so ledger id space would be
+ * fundamentally limited to 9 billion. In practice, the id generated by zookeeper
+ * is only 31 bits (signed 32-bit integer), so the limit is much lower than 9 billion.
+ *
+ * In order to support the full range of the long ledgerId, once ledgerIds reach Integer.MAX_INT,
+ * a new system is employed. The 32 most significant bits of the ledger ID are taken and turned into
+ * a directory prefixed with <i>HOB-</i> under <i>(ledgerIdGenPath)</i>
+ *
+ * Under this <i>HOB-</i> directory, zookeeper is used to continue generating EPHEMERAL_SEQUENTIAL ids
+ * which constitute the lower 32-bits of the ledgerId (sign bit is always 0). Once the <i>HOB-</i> directory runs out of available
+ * ids, the process is repeated. The higher bits are incremented, a new <i>HOB-</i> directory is created, and
+ * zookeeper generates sequential ids underneath it.
+ *
+ * The reason for treating ids which are less than Integer.MAX_INT differently is to maintain backwards
+ * compatibility. This is a drop-in replacement for ZkLedgerIdGenerator.
+ */
+public class LongZkLedgerIdGenerator implements LedgerIdGenerator {
+    private static final Logger LOG = LoggerFactory.getLogger(LongZkLedgerIdGenerator.class);
+    private ZooKeeper zk;
+    private String ledgerIdGenPath;
+    private ZkLedgerIdGenerator shortIdGen;
+    private List<String> highOrderDirectories;
+    private HighOrderLedgerIdGenPathStatus ledgerIdGenPathStatus;
+    private final List<ACL> zkAcls;
+
+    private enum HighOrderLedgerIdGenPathStatus {
+        UNKNOWN,
+        PRESENT,
+        NOT_PRESENT
+    };
+
+    public LongZkLedgerIdGenerator(ZooKeeper zk, String ledgersPath, String idGenZnodeName, ZkLedgerIdGenerator shortIdGen, List<ACL> zkAcls) {
+        this.zk = zk;
+        if (StringUtils.isBlank(idGenZnodeName)) {
+            this.ledgerIdGenPath = ledgersPath;
+        } else {
+            this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
+        }
+        this.shortIdGen = shortIdGen;
+        highOrderDirectories = new ArrayList<String>();
+        ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.UNKNOWN;
+        this.zkAcls = zkAcls; 
+    }
+
+    private void generateLongLedgerIdLowBits(final String ledgerPrefix, long highBits, final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+        String highPath = ledgerPrefix + formatHalfId((int)highBits);
+        ZkLedgerIdGenerator.generateLedgerIdImpl(new GenericCallback<Long>(){
+            @Override
+            public void operationComplete(int rc, Long result) {
+                if(rc == BKException.Code.OK) {
+                    assert((highBits & 0xFFFFFFFF00000000l) == 0);
+                    assert((result & 0xFFFFFFFF00000000l) == 0);
+                    cb.operationComplete(rc, (highBits << 32) | result);
+                }
+                else if(rc == BKException.Code.LedgerIdOverflowException) {
+                    // Lower bits are full. Need to expand and create another HOB node.
+                    try {
+                        Long newHighBits = highBits + 1;
+                        createHOBPathAndGenerateId(ledgerPrefix, newHighBits.intValue(), cb);
+                    }
+                    catch (KeeperException e) {
+                        LOG.error("Failed to create long ledger ID path", e);
+                        cb.operationComplete(BKException.Code.ZKException, null);
+                    }
+                    catch (InterruptedException e) {
+                        LOG.error("Failed to create long ledger ID path", e);
+                        cb.operationComplete(BKException.Code.InterruptedException, null);
+                    } catch (IOException e) {
+                        LOG.error("Failed to create long ledger ID path", e);
+                        cb.operationComplete(BKException.Code.IllegalOpException, null);
+                    }
+
+                }
+            }
+
+        }, zk, ZkLedgerIdGenerator.createLedgerPrefix(highPath, null), zkAcls);
+    }
+
+    /**
+     * Formats half an ID as 10-character 0-padded string
+     * @param i - 32 bits of the ID to format
+     * @return a 10-character 0-padded string.
+     */
+    private String formatHalfId(int i) {
+        return String.format("%010d", i);
+    }
+
+    private void createHOBPathAndGenerateId(String ledgerPrefix, int hob, final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+        try {
+            LOG.debug("Creating HOB path: {}", ledgerPrefix + formatHalfId(hob));
+            zk.create(ledgerPrefix + formatHalfId(hob), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+        catch(KeeperException.NodeExistsException e) {
+            // It's fine if we lost a race to create the node (NodeExistsException).
+            // All other exceptions should continue unwinding.
+            LOG.debug("Tried to create High-order-bits node, but it already existed!", e);
+        }
+        // We just created a new HOB directory. Invalidate the directory cache
+        invalidateDirectoryCache();
+        generateLongLedgerId(cb); // Try again.
+    }
+
+    private void invalidateDirectoryCache() {
+        highOrderDirectories = null;
+    }
+
+    private void generateLongLedgerId(final GenericCallback<Long> cb) throws KeeperException, InterruptedException, IOException {
+        final String hobPrefix = "HOB-";
+        final String ledgerPrefix = this.ledgerIdGenPath + "/" + hobPrefix;
+
+        // Only pull the directories from zk if we don't have any cached.
+        boolean refreshedDirectories = false;
+        if(highOrderDirectories == null) {
+            refreshedDirectories = true;
+            highOrderDirectories = zk.getChildren(ledgerIdGenPath, false);
+        }
+
+        Optional<Long> largest = highOrderDirectories.stream()
+            .map((t) -> {
+                    try {
+                        return Long.parseLong(t.replace(hobPrefix, ""));
+                    }
+                    catch(NumberFormatException e) {
+                        return null;
+                    }
+                })
+            .filter((t) -> t != null)
+            .reduce(Math::max);
+
+        // If we didn't get any valid IDs from the directory...
+        if(!largest.isPresent()) {
+            if(!refreshedDirectories) {
+                // Our cache might be bad. Invalidate it and retry.
+                invalidateDirectoryCache();
+                generateLongLedgerId(cb); // Try again
+            }
+            else {
+                // else, Start at HOB-0000000001;
+                createHOBPathAndGenerateId(ledgerPrefix, 1, cb);
+            }
+            return;
+        }
+
+        // Found the largest.
+        // Get the low-order bits.
+        final Long highBits = largest.get();
+        generateLongLedgerIdLowBits(ledgerPrefix, highBits, cb);
+
+        // Perform garbage collection on HOB- directories.
+        // Keeping 3 should be plenty to prevent races
+        if(highOrderDirectories.size() > 3) {
+            Object[] highOrderDirs = highOrderDirectories.stream()
+                    .map((t) -> {
+                            try {
+                                return Long.parseLong(t.replace(hobPrefix, ""));
+                            }
+                            catch(NumberFormatException e) {
+                                return null;
+                            }
+                        })
+                    .filter((t) -> t != null)
+                    .sorted()
+                    .toArray();
+
+            // Go ahead and invalidate. We want to reload cache even if we fail.
+            invalidateDirectoryCache();
+
+            for(int i = 0; i < highOrderDirs.length - 3; i++) {
+                String path = ledgerPrefix + formatHalfId(((Long)highOrderDirs[i]).intValue());
+                LOG.debug("DELETING HIGH ORDER DIR: {}", path);
+                try {
+                    zk.delete(path, 0);
+                }
+                catch (KeeperException e) {
+                    // We don't care if we fail. Just warn about it.
+                    LOG.debug("Failed to delete {}", path);
+                }
+            }
+        }
+    }
+
+    private void createLongLedgerIdPathAndGenerateLongLedgerId(final GenericCallback<Long> cb, String createPath) {
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerIdGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                              CreateMode.PERSISTENT, new StringCallback() {
+                                                      @Override
+                                                      public void processResult(int rc, String path, Object ctx, String name) {
+                                                          try {
+                                                              setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.PRESENT);
+                                                              generateLongLedgerId(cb);
+                                                          } catch (KeeperException e) {
+                                                              LOG.error("Failed to create long ledger ID path", e);
+                                                              setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+                                                              cb.operationComplete(BKException.Code.ZKException, null);
+                                                          } catch (InterruptedException e) {
+                                                              LOG.error("Failed to create long ledger ID path", e);
+                                                              setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+                                                              cb.operationComplete(BKException.Code.InterruptedException, null);
+                                                          } catch (IOException e) {
+                                                              LOG.error("Failed to create long ledger ID path", e);
+                                                              setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+                                                              cb.operationComplete(BKException.Code.IllegalOpException, null);
+                                                          }
+                                                      }
+                                                  }, null);
+    }
+
+    public void invalidateLedgerIdGenPathStatus() {
+        setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus.UNKNOWN);
+    }
+
+    synchronized private void setLedgerIdGenPathStatus(HighOrderLedgerIdGenPathStatus status) {
+        ledgerIdGenPathStatus = status;
+    }
+
+    /**
+     * Checks the existence of the long ledger id gen path. Existence indicates we have switched from the legacy
+     * algorithm to the new method of generating 63-bit ids. If the existence is UNKNOWN, it looks in zk to
+     * find out. If it previously checked in zk, it returns that value. This value changes when we run out
+     * of ids < Integer.MAX_VALUE, and try to create the long ledger id gen path.
+     * @see createLongLedgerIdPathAndGenerateLongLedgerId
+     * @param zk
+     * @return Does the long ledger id gen path exist?
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    synchronized public boolean ledgerIdGenPathPresent(ZooKeeper zk) throws KeeperException, InterruptedException {
+        switch(ledgerIdGenPathStatus) {
+        case UNKNOWN:
+            if(zk.exists(ledgerIdGenPath, false) != null) {
+                ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.PRESENT;
+                return true;
+            }
+            else {
+                ledgerIdGenPathStatus = HighOrderLedgerIdGenPathStatus.NOT_PRESENT;
+                return false;
+            }
+        case PRESENT:
+            return true;
+        case NOT_PRESENT:
+            return false;
+        default:
+            return false;
+        }
+    }
+
+    @Override
+    public void generateLedgerId(final GenericCallback<Long> cb) {
+        try {
+            if(!ledgerIdGenPathPresent(zk)) {
+                // We've not moved onto 63-bit ledgers yet.
+                shortIdGen.generateLedgerId(new GenericCallback<Long>(){
+                        @Override
+                        public void operationComplete(int rc, Long result) {
+                            if(rc == BKException.Code.LedgerIdOverflowException) {
+                                // 31-bit IDs overflowed. Start using 63-bit ids.
+                                createLongLedgerIdPathAndGenerateLongLedgerId(cb, ledgerIdGenPath);
+                            }
+                            else {
+                                // 31-bit Generation worked OK, or had some other
+                                // error that we will pass on.
+                                cb.operationComplete(rc, result);
+                            }
+                        }
+                    });
+            }
+            else {
+                // We've already started generating 63-bit ledger IDs.
+                // Keep doing that.
+                generateLongLedgerId(cb);
+            }
+        } catch (KeeperException e) {
+            LOG.error("Failed to create long ledger ID path", e);
+            cb.operationComplete(BKException.Code.ZKException, null);
+        }
+        catch (InterruptedException e) {
+            LOG.error("Failed to create long ledger ID path", e);
+            cb.operationComplete(BKException.Code.InterruptedException, null);
+        }
+        catch (IOException e) {
+            LOG.error("Failed to create long ledger ID path", e);
+            cb.operationComplete(BKException.Code.IllegalOpException, null);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        shortIdGen.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
index a2e79af..1373f34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerIdGenerator.java
@@ -47,7 +47,6 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
     static final String LEDGER_ID_GEN_PREFIX = "ID-";
 
     final ZooKeeper zk;
-    final String ledgerIdGenPath;
     final String ledgerPrefix;
     final List<ACL> zkAcls;
 
@@ -56,17 +55,26 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
                                String idGenZnodeName,
                                List<ACL> zkAcls) {
         this.zk = zk;
+        ledgerPrefix = createLedgerPrefix(ledgersPath, idGenZnodeName);
         this.zkAcls = zkAcls;
+    }
+
+    public static String createLedgerPrefix(String ledgersPath, String idGenZnodeName) {
+        String ledgerIdGenPath = null;
         if (StringUtils.isBlank(idGenZnodeName)) {
-            this.ledgerIdGenPath = ledgersPath;
+            ledgerIdGenPath = ledgersPath;
         } else {
-            this.ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
+            ledgerIdGenPath = ledgersPath + "/" + idGenZnodeName;
         }
-        this.ledgerPrefix = this.ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
+        return ledgerIdGenPath + "/" + LEDGER_ID_GEN_PREFIX;
     }
 
     @Override
     public void generateLedgerId(final GenericCallback<Long> cb) {
+        generateLedgerIdImpl(cb, zk, ledgerPrefix, zkAcls);
+    }
+
+    public static void generateLedgerIdImpl(final GenericCallback<Long> cb, ZooKeeper zk, String ledgerPrefix, List<ACL> zkAcls) {
         ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPrefix, new byte[0], zkAcls,
                 CreateMode.EPHEMERAL_SEQUENTIAL,
                 new StringCallback() {
@@ -84,8 +92,13 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
                          */
                         long ledgerId;
                         try {
-                            ledgerId = getLedgerIdFromGenPath(idPathName);
-                            cb.operationComplete(BKException.Code.OK, ledgerId);
+                            ledgerId = getLedgerIdFromGenPath(idPathName, ledgerPrefix);
+                            if(ledgerId < 0 || ledgerId >= Integer.MAX_VALUE) {
+                                cb.operationComplete(BKException.Code.LedgerIdOverflowException, null);
+                            }
+                            else {
+                                cb.operationComplete(BKException.Code.OK, ledgerId);
+                            }
                         } catch (IOException e) {
                             LOG.error("Could not extract ledger-id from id gen path:" + path, e);
                             cb.operationComplete(BKException.Code.ZKException, null);
@@ -109,7 +122,7 @@ public class ZkLedgerIdGenerator implements LedgerIdGenerator {
     }
 
     // get ledger id from generation path
-    private long getLedgerIdFromGenPath(String nodeName) throws IOException {
+    private static long getLedgerIdFromGenPath(String nodeName, String ledgerPrefix) throws IOException {
         long ledgerId;
         try {
             String parts[] = nodeName.split(ledgerPrefix);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
index c2f658b..c0c110d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
@@ -57,7 +57,7 @@ public class StringUtils {
      *          ledger id
      * @return the hierarchical path
      */
-    public static String getHierarchicalLedgerPath(long ledgerId) {
+    public static String getShortHierarchicalLedgerPath(long ledgerId) {
         String ledgerIdStr = getZKStringId(ledgerId);
         // do 2-4-4 split
         StringBuilder sb = new StringBuilder();
@@ -90,6 +90,13 @@ public class StringUtils {
         return sb.toString();
     }
     
+    public static String getHybridHierarchicalLedgerPath(long ledgerId) {
+        if(ledgerId < Integer.MAX_VALUE) {
+            return getShortHierarchicalLedgerPath(ledgerId);
+        }
+        return getLongHierarchicalLedgerPath(ledgerId);
+    }
+    
     /**
      * Parse the hierarchical ledger path to its ledger id
      *
@@ -119,7 +126,7 @@ public class StringUtils {
             throws IOException {
         String[] longHierarchicalParts = longHierarchicalLedgerPath.split("/");
         if (longHierarchicalParts.length != 5) {
-            throw new IOException("it is not a valid hierarchical path name : " + longHierarchicalLedgerPath);
+            return stringToHierarchicalLedgerId(longHierarchicalLedgerPath);
         }
         longHierarchicalParts[4] =
                 longHierarchicalParts[4].substring(LEDGER_NODE_PREFIX.length());

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index 777b619..d25bd70 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -882,10 +883,13 @@ public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
     public void ensurePasswordUsedForOldLedgers() throws Exception {
         // This test bases on creating old ledgers in version 4.1.0, which only
         // supports ZooKeeper based flat and hierarchical LedgerManagerFactory.
-        // So we ignore it for MSLedgerManagerFactory.
+        // So we ignore it for MSLedgerManagerFactory and LongHierarchicalLedgerManagerFactory.
         if (MSLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
             return;
         }
+        if (LongHierarchicalLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
+            return;
+        }
 
         // stop all bookies
         // and wipe the ledger layout so we can use an old client

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
new file mode 100644
index 0000000..75422a4
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLongZkLedgerIdGenerator.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.TestCase;
+
+public class TestLongZkLedgerIdGenerator extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(TestZkLedgerIdGenerator.class);
+
+    ZooKeeperUtil zkutil;
+    ZooKeeper zk;
+
+    LongZkLedgerIdGenerator ledgerIdGenerator;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("Setting up test");
+        super.setUp();
+
+        zkutil = new ZooKeeperUtil();
+        zkutil.startServer();
+        zk = zkutil.getZooKeeperClient();
+
+        ZkLedgerIdGenerator shortLedgerIdGenerator = new ZkLedgerIdGenerator(zk,
+                "/test-zk-ledger-id-generator", "idgen", ZooDefs.Ids.OPEN_ACL_UNSAFE);
+        ledgerIdGenerator = new LongZkLedgerIdGenerator(zk, 
+                "/test-zk-ledger-id-generator", "idgen-long", shortLedgerIdGenerator, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("Tearing down test");
+        ledgerIdGenerator.close();
+        zk.close();
+        zkutil.killServer();
+
+        super.tearDown();
+    }
+
+    @Test(timeout=60000)
+    public void testGenerateLedgerId() throws Exception {
+        // Create *nThread* threads each generate *nLedgers* ledger id,
+        // and then check there is no identical ledger id.
+        final int nThread = 2;
+        final int nLedgers = 2000;
+        // Multiply by two. We're going to do half in the old legacy space and half in the new.
+        final CountDownLatch countDownLatch = new CountDownLatch(nThread*nLedgers*2); 
+
+        final AtomicInteger errCount = new AtomicInteger(0);
+        final ConcurrentLinkedQueue<Long> ledgerIds = new ConcurrentLinkedQueue<Long>();
+        final GenericCallback<Long> cb = new GenericCallback<Long>() {
+            @Override
+            public void operationComplete(int rc, Long result) {
+                if (Code.OK.intValue() == rc) {
+                    ledgerIds.add(result);
+                } else {
+                    errCount.incrementAndGet();
+                }
+                countDownLatch.countDown();
+            }
+        };
+
+        long start = System.currentTimeMillis();
+
+        for (int i = 0; i < nThread; i++) {
+            new Thread() {
+                @Override
+                public void run() {
+                    for (int j = 0; j < nLedgers; j++) {
+                        ledgerIdGenerator.generateLedgerId(cb);
+                    }
+                }
+            }.start();
+        }
+        
+        // Go and create the long-id directory in zookeeper. This should cause the id generator to generate ids with the
+        // new algo once we clear it's stored status.
+        ZkUtils.createFullPathOptimistic(zk, "/test-zk-ledger-id-generator/idgen-long", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        ledgerIdGenerator.invalidateLedgerIdGenPathStatus();
+        
+        for (int i = 0; i < nThread; i++) {
+            new Thread() {
+                @Override
+                public void run() {
+                    for (int j = 0; j < nLedgers; j++) {
+                        ledgerIdGenerator.generateLedgerId(cb);
+                    }
+                }
+            }.start();
+        }
+
+        assertTrue("Wait ledger id generation threads to stop timeout : ",
+                countDownLatch.await(30, TimeUnit.SECONDS));
+        LOG.info("Number of generated ledger id: {}, time used: {}", ledgerIds.size(),
+                System.currentTimeMillis() - start);
+        assertEquals("Error occur during ledger id generation : ", 0, errCount.get());
+
+        Set<Long> ledgers = new HashSet<Long>();
+        while (!ledgerIds.isEmpty()) {
+            Long ledger = ledgerIds.poll();
+            assertNotNull("Generated ledger id is null : ", ledger);
+            assertFalse("Ledger id [" + ledger + "] conflict : ", ledgers.contains(ledger));
+            ledgers.add(ledger);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
index 2c9a1f4..357bd34 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
@@ -48,6 +48,7 @@ public abstract class MultiLedgerManagerMultiDigestTestCase extends BookKeeperCl
     public static Collection<Object[]> configs() {
         String[] ledgerManagers = {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.LegacyHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.MSLedgerManagerFactory",

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
index 34a22af..cb640b1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
@@ -42,6 +42,7 @@ public abstract class MultiLedgerManagerTestCase extends BookKeeperClusterTestCa
     public static Collection<Object[]> configs() {
         String[] ledgerManagers = new String[] {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.LegacyHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory",
             "org.apache.bookkeeper.meta.MSLedgerManagerFactory",

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
index 3249194..18504e3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestBackwardCompat.java
@@ -37,6 +37,7 @@ import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.FileSystemUpgrade;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.util.IOUtils;
@@ -437,6 +438,17 @@ public class TestBackwardCompat {
             return new LedgerCurrent(newbk, newlh);
         }
 
+        static LedgerCurrent openLedger(long id, ClientConfiguration conf) throws Exception {
+            conf.setZkServers(zkUtil.getZooKeeperConnectString());
+            org.apache.bookkeeper.client.BookKeeper newbk
+                = new org.apache.bookkeeper.client.BookKeeper(conf);
+            org.apache.bookkeeper.client.LedgerHandle newlh
+                = newbk.openLedger(id,
+                                   org.apache.bookkeeper.client.BookKeeper.DigestType.CRC32,
+                                "foobar".getBytes());
+            return new LedgerCurrent(newbk, newlh);
+        }
+
         long getId() {
             return lh.getId();
         }
@@ -827,4 +839,45 @@ public class TestBackwardCompat {
         oldledger.close();
         scur.stop();
     }
+
+    /**
+     * Test compatability between version old version and the current version
+     * with respect to the HierarchicalLedgerManagers.
+     * - 4.2.0 server starts with HierarchicalLedgerManager.
+     * - Write ledgers with old and new clients
+     * - Read ledgers written by old clients.
+     */
+    @Test(timeout = 60000)
+    public void testCompatHierarchicalLedgerManager() throws Exception {
+        File journalDir = createTempDir("bookie", "journal");
+        File ledgerDir = createTempDir("bookie", "ledger");
+
+        int port = PortManager.nextFreePort();
+        // start server, upgrade
+        Server420 s420 = new Server420(journalDir, ledgerDir, port);
+        s420.getConf().setLedgerManagerFactoryClassName("org.apache.bk_v4_2_0.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+        s420.start();
+
+        Ledger420 l420 = Ledger420.newLedger();
+        l420.write100();
+        long oldLedgerId = l420.getId();
+        l420.close();
+        s420.stop();
+
+        // Start the current server
+        ServerCurrent scur = new ServerCurrent(journalDir, ledgerDir, port, true);
+        scur.getConf().setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+        scur.getConf().setProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK, true);
+        scur.start();
+
+        // Munge the conf so we can test.
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+        conf.setProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK, true);
+
+        // check that new client can read old ledgers on new server
+        LedgerCurrent oldledger = LedgerCurrent.openLedger(oldLedgerId, conf);
+        assertEquals("Failed to read entries!", 100, oldledger.readAll());
+        oldledger.close();
+    }
 }


[2/2] bookkeeper git commit: BOOKKEEPER-552: 64 Bits Ledger ID Generation

Posted by mm...@apache.org.
BOOKKEEPER-552: 64 Bits Ledger ID Generation

This PR supersedes #112

Instead of moving LongHierarchicalLedgerManager to HierarchicalLedgerManager, LongHierarchicalLedgerManager is still a stand-alone manager. HierarchicalLedgerManager has been moved to LegacyHierarchicalLedgerManager, and a new HierarchicalLedgerManager class has been put in its place, which is backwards-compatible with the original (now legacy) HierarchicalLedgerManager.

This new HierarchicalLedgerManager leverages the new LongZkLedgerIdGenerator to generate Ids, and uses the LongHierarchicalLedgerManager to manage metadata for ledger IDs > 31 bits long. For shorter ledger ids, the LegacyHierarchicalLedgerManager is used, to keep backwards-compatibility.

Author: Kyle Nusbaum <kn...@yahoo-inc.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Venkateswararao Jujjuri (JV) <None>

Closes #114 from knusbaum/BOOKKEEPER-552


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/057af8db
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/057af8db
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/057af8db

Branch: refs/heads/master
Commit: 057af8dbce6c08794eb8b46ca52ca13f222d9bbb
Parents: 9c79e07
Author: Kyle Nusbaum <kn...@yahoo-inc.com>
Authored: Tue May 2 14:04:57 2017 -0700
Committer: Matteo Merli <mm...@apache.org>
Committed: Tue May 2 14:04:57 2017 -0700

----------------------------------------------------------------------
 .../apache/bookkeeper/client/BKException.java   |  12 +
 .../bookkeeper/conf/AbstractConfiguration.java  |   3 +
 .../meta/AbstractHierarchicalLedgerManager.java | 213 +++++++++++
 .../meta/AbstractZkLedgerManager.java           |   3 +-
 .../bookkeeper/meta/FlatLedgerManager.java      |   6 +-
 .../meta/HierarchicalLedgerManager.java         | 375 +++----------------
 .../meta/HierarchicalLedgerManagerFactory.java  |  80 +---
 .../bookkeeper/meta/LedgerManagerFactory.java   |   5 +-
 .../meta/LegacyHierarchicalLedgerManager.java   | 281 ++++++++++++++
 .../LegacyHierarchicalLedgerManagerFactory.java | 100 +++++
 .../meta/LongHierarchicalLedgerManager.java     | 101 +++--
 .../meta/LongZkLedgerIdGenerator.java           | 333 ++++++++++++++++
 .../bookkeeper/meta/ZkLedgerIdGenerator.java    |  27 +-
 .../org/apache/bookkeeper/util/StringUtils.java |  11 +-
 .../bookkeeper/client/BookieRecoveryTest.java   |   6 +-
 .../meta/TestLongZkLedgerIdGenerator.java       | 145 +++++++
 .../MultiLedgerManagerMultiDigestTestCase.java  |   1 +
 .../test/MultiLedgerManagerTestCase.java        |   1 +
 .../bookkeeper/test/TestBackwardCompat.java     |  53 +++
 19 files changed, 1310 insertions(+), 446 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 2377c1c..aa3ec08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -102,6 +102,8 @@ public abstract class BKException extends Exception {
             return new BKDuplicateEntryIdException();
         case Code.TimeoutException:
             return new BKTimeoutException();
+        case Code.LedgerIdOverflowException:
+            return new BKLedgerIdOverflowException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -142,6 +144,8 @@ public abstract class BKException extends Exception {
         int UnauthorizedAccessException = -102;
         int UnclosedFragmentException = -103;
         int WriteOnReadOnlyBookieException = -104;
+        //-105 reserved for TooManyRequestsException
+        int LedgerIdOverflowException = -106;
 
         // generic exception code used to propagate in replication pipeline
         int ReplicationException = -200;
@@ -210,6 +214,8 @@ public abstract class BKException extends Exception {
             return "Attempting to use an unclosed fragment; This is not safe";
         case Code.WriteOnReadOnlyBookieException:
             return "Attempting to write on ReadOnly bookie";
+        case Code.LedgerIdOverflowException:
+            return "Next ledgerID is too large.";
         case Code.ReplicationException:
             return "Errors in replication pipeline";
         case Code.ClientClosedException:
@@ -404,4 +410,10 @@ public abstract class BKException extends Exception {
             super(Code.TimeoutException);
         }
     }
+
+    public static class BKLedgerIdOverflowException extends BKException {
+        public BKLedgerIdOverflowException() {
+            super(Code.LedgerIdOverflowException);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 07e5d08..c7c50cd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -76,6 +76,9 @@ public abstract class AbstractConfiguration extends CompositeConfiguration {
     // Zookeeper ACL settings
     protected final static String ZK_ENABLE_SECURITY = "zkEnableSecurity";
 
+    // Kluge for compatibility testing. Never set this outside tests.
+    public final static String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
new file mode 100644
index 0000000..02359e0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractHierarchicalLedgerManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHierarchicalLedgerManager extends AbstractZkLedgerManager {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractHierarchicalLedgerManager.class);
+
+    /**
+     * Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     */
+    public AbstractHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
+    }
+    
+    /**
+     * Process hash nodes in a given path
+     */
+    void asyncProcessLevelNodes(
+        final String path, final Processor<String> processor,
+        final AsyncCallback.VoidCallback finalCb, final Object context,
+        final int successRc, final int failureRc) {
+        zk.sync(path, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("Error syncing path " + path + " when getting its chidren: ",
+                              KeeperException.create(KeeperException.Code.get(rc), path));
+                    finalCb.processResult(failureRc, null, context);
+                    return;
+                }
+
+                zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx,
+                                              List<String> levelNodes) {
+                        if (rc != Code.OK.intValue()) {
+                            LOG.error("Error polling hash nodes of " + path,
+                                      KeeperException.create(KeeperException.Code.get(rc), path));
+                            finalCb.processResult(failureRc, null, context);
+                            return;
+                        }
+                        AsyncListProcessor<String> listProcessor =
+                                new AsyncListProcessor<String>(scheduler);
+                        // process its children
+                        listProcessor.process(levelNodes, processor, finalCb,
+                                              context, successRc, failureRc);
+                    }
+                }, null);
+            }
+        }, null);
+    }
+    
+    /**
+     * Process list one by one in asynchronize way. Process will be stopped immediately
+     * when error occurred.
+     */
+    private static class AsyncListProcessor<T> {
+        // use this to prevent long stack chains from building up in callbacks
+        ScheduledExecutorService scheduler;
+
+        /**
+         * Constructor
+         *
+         * @param scheduler
+         *          Executor used to prevent long stack chains
+         */
+        public AsyncListProcessor(ScheduledExecutorService scheduler) {
+            this.scheduler = scheduler;
+        }
+
+        /**
+         * Process list of items
+         *
+         * @param data
+         *          List of data to process
+         * @param processor
+         *          Callback to process element of list when success
+         * @param finalCb
+         *          Final callback to be called after all elements in the list are processed
+         * @param context
+         *          Context of final callback
+         * @param successRc
+         *          RC passed to final callback on success
+         * @param failureRc
+         *          RC passed to final callback on failure
+         */
+        public void process(final List<T> data, final Processor<T> processor,
+                            final AsyncCallback.VoidCallback finalCb, final Object context,
+                            final int successRc, final int failureRc) {
+            if (data == null || data.size() == 0) {
+                finalCb.processResult(successRc, null, context);
+                return;
+            }
+            final int size = data.size();
+            final AtomicInteger current = new AtomicInteger(0);
+            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (rc != successRc) {
+                        // terminal immediately
+                        finalCb.processResult(failureRc, null, context);
+                        return;
+                    }
+                    // process next element
+                    int next = current.incrementAndGet();
+                    if (next >= size) { // reach the end of list
+                        finalCb.processResult(successRc, null, context);
+                        return;
+                    }
+                    final T dataToProcess = data.get(next);
+                    final AsyncCallback.VoidCallback stub = this;
+                    scheduler.submit(new Runnable() {
+                        @Override
+                        public final void run() {
+                            processor.process(dataToProcess, stub);
+                        }
+                    });
+                }
+            };
+            T firstElement = data.get(0);
+            processor.process(firstElement, stubCallback);
+        }
+    }
+    
+    // get ledger from all level nodes
+    long getLedgerId(String...levelNodes) throws IOException {
+        return StringUtils.stringToHierarchicalLedgerId(levelNodes);
+    }
+    
+    /**
+     * Get all ledger ids in the given zk path.
+     *
+     * @param ledgerNodes
+     *          List of ledgers in the given path
+     *          example:- {L1652, L1653, L1650}
+     * @param path
+     *          The zookeeper path of the ledger ids. The path should start with {@ledgerRootPath}
+     *          example (with ledgerRootPath = /ledgers):- /ledgers/00/0053
+     */
+    @Override
+    protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+        NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>();
+
+        if (!path.startsWith(ledgerRootPath)) {
+            LOG.warn("Ledger path [{}] is not a valid path name, it should start wth {}", path, ledgerRootPath);
+            return zkActiveLedgers;
+        }
+
+        long ledgerIdPrefix = 0;
+        char ch;
+        for (int i = ledgerRootPath.length() + 1; i < path.length(); i++) {
+            ch = path.charAt(i);
+            if (ch < '0' || ch > '9')
+                continue;
+            ledgerIdPrefix = ledgerIdPrefix * 10 + (ch - '0');
+        }
+
+        for (String ledgerNode : ledgerNodes) {
+            if (isSpecialZnode(ledgerNode)) {
+                continue;
+            }
+            long ledgerId = ledgerIdPrefix;
+            for (int i = 0; i < ledgerNode.length(); i++) {
+                ch = ledgerNode.charAt(i);
+                if (ch < '0' || ch > '9')
+                    continue;
+                ledgerId = ledgerId * 10 + (ch - '0');
+            }
+            zkActiveLedgers.add(ledgerId);
+        }
+        return zkActiveLedgers;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 6db3375..f5a60f6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataLis
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.zookeeper.AsyncCallback;
@@ -468,7 +469,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
      *          Znode Name
      * @return true  if the znode is a special znode otherwise false
      */
-    protected boolean isSpecialZnode(String znode) {
+     protected static boolean isSpecialZnode(String znode) {
         if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
                 || BookKeeperConstants.COOKIE_NODE.equals(znode)
                 || BookKeeperConstants.LAYOUT_ZNODE.equals(znode)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
index 3172247..36db62a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
@@ -87,9 +87,9 @@ class FlatLedgerManager extends AbstractZkLedgerManager {
         asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
     }
 
-    @Override
-    protected boolean isSpecialZnode(String znode) {
-        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || super.isSpecialZnode(znode);
+    
+    protected static boolean isSpecialZnode(String znode) {
+        return znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX) || AbstractZkLedgerManager.isSpecialZnode(znode);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
index bed1627..309762b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
@@ -18,370 +18,105 @@
 package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.StringUtils;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
- * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ * HierarchicalLedgerManager makes use of both LongHierarchicalLedgerManager and LegacyHierarchicalLedgerManager
+ * to extend the 31-bit ledger id range of the LegacyHierarchicalLedgerManager to that of the LongHierarchicalLedgerManager
+ * while remaining backwards-compatible with the legacy manager.
+ * 
+ * In order to achieve backwards-compatibility, the HierarchicalLedgerManager forwards requests relating to ledger IDs which 
+ * are < Integer.MAX_INT to the LegacyHierarchicalLedgerManager. The new 5-part directory structure will not appear until a 
+ * ledger with an ID >= Integer.MAX_INT is created.
  *
- * <p>
- * HierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
- * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
- * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
- * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
- * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
- * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * errors during garbage collection due to lists of children that are too long.
+ * @see LongHierarchicalLedgerManager
+ * @see LegacyHierarchicalLedgerManager
  */
-class HierarchicalLedgerManager extends AbstractZkLedgerManager {
-
+class HierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
     static final Logger LOG = LoggerFactory.getLogger(HierarchicalLedgerManager.class);
 
-    static final String IDGEN_ZNODE = "idgen";
-    private static final String MAX_ID_SUFFIX = "9999";
-    private static final String MIN_ID_SUFFIX = "0000";
+    LegacyHierarchicalLedgerManager legacyLM;
+    LongHierarchicalLedgerManager longLM;
 
-    /**
-     * Constructor
-     *
-     * @param conf
-     *          Configuration object
-     * @param zk
-     *          ZooKeeper Client Handle
-     */
     public HierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         super(conf, zk);
+        legacyLM = new LegacyHierarchicalLedgerManager(conf, zk);
+        longLM = new LongHierarchicalLedgerManager (conf, zk);
     }
 
     @Override
-    public String getLedgerPath(long ledgerId) {
-        return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
-    }
-
-    @Override
-    public long getLedgerId(String pathName) throws IOException {
-        if (!pathName.startsWith(ledgerRootPath)) {
-            throw new IOException("it is not a valid hashed path name : " + pathName);
-        }
-        String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
-        return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
-    }
-
-    // get ledger from all level nodes
-    long getLedgerId(String...levelNodes) throws IOException {
-        return StringUtils.stringToHierarchicalLedgerId(levelNodes);
-    }
-
-    //
-    // Active Ledger Manager
-    //
-
-    /**
-     * Get the smallest cache id in a specified node /level1/level2
-     *
-     * @param level1
-     *          1st level node name
-     * @param level2
-     *          2nd level node name
-     * @return the smallest ledger id
-     */
-    private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
-        return getLedgerId(level1, level2, MIN_ID_SUFFIX);
-    }
-
-    /**
-     * Get the largest cache id in a specified node /level1/level2
-     *
-     * @param level1
-     *          1st level node name
-     * @param level2
-     *          2nd level node name
-     * @return the largest ledger id
-     */
-    private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
-        return getLedgerId(level1, level2, MAX_ID_SUFFIX);
-    }
-
-    @Override
-    public void asyncProcessLedgers(final Processor<Long> processor,
-                                    final AsyncCallback.VoidCallback finalCb, final Object context,
-                                    final int successRc, final int failureRc) {
-        // process 1st level nodes
-        asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
-            @Override
-            public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
-                if (isSpecialZnode(l1Node)) {
-                    cb1.processResult(successRc, null, context);
-                    return;
-                }
-                final String l1NodePath = ledgerRootPath + "/" + l1Node;
-                // process level1 path, after all children of level1 process
-                // it callback to continue processing next level1 node
-                asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
-                    @Override
-                    public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
-                        // process level1/level2 path
-                        String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
-                        // process each ledger
-                        // after all ledger are processed, cb2 will be call to continue processing next level2 node
-                        asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
-                                                        context, successRc, failureRc);
-                    }
-                }, cb1, context, successRc, failureRc);
-            }
-        }, finalCb, context, successRc, failureRc);
-    }
+    public void asyncProcessLedgers(Processor<Long> processor, VoidCallback finalCb, Object context, int successRc,
+            int failureRc) {
+        // Process the old 31-bit id ledgers first.
+        legacyLM.asyncProcessLedgers(processor, new VoidCallback(){
 
-    /**
-     * Process hash nodes in a given path
-     */
-    void asyncProcessLevelNodes(
-        final String path, final Processor<String> processor,
-        final AsyncCallback.VoidCallback finalCb, final Object context,
-        final int successRc, final int failureRc) {
-        zk.sync(path, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("Error syncing path " + path + " when getting its chidren: ",
-                              KeeperException.create(KeeperException.Code.get(rc), path));
-                    finalCb.processResult(failureRc, null, context);
-                    return;
+                if(rc == failureRc) {
+                    // If it fails, return the failure code to the callback
+                    finalCb.processResult(rc, path, ctx);
+                }
+                else {
+                    // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
+                    longLM.asyncProcessLedgers(processor, finalCb, context, successRc, failureRc);
                 }
-
-                zk.getChildren(path, false, new AsyncCallback.ChildrenCallback() {
-                    @Override
-                    public void processResult(int rc, String path, Object ctx,
-                                              List<String> levelNodes) {
-                        if (rc != Code.OK.intValue()) {
-                            LOG.error("Error polling hash nodes of " + path,
-                                      KeeperException.create(KeeperException.Code.get(rc), path));
-                            finalCb.processResult(failureRc, null, context);
-                            return;
-                        }
-                        AsyncListProcessor<String> listProcessor =
-                                new AsyncListProcessor<String>(scheduler);
-                        // process its children
-                        listProcessor.process(levelNodes, processor, finalCb,
-                                              context, successRc, failureRc);
-                    }
-                }, null);
             }
-        }, null);
-    }
-
-    /**
-     * Process list one by one in asynchronize way. Process will be stopped immediately
-     * when error occurred.
-     */
-    private static class AsyncListProcessor<T> {
-        // use this to prevent long stack chains from building up in callbacks
-        ScheduledExecutorService scheduler;
 
-        /**
-         * Constructor
-         *
-         * @param scheduler
-         *          Executor used to prevent long stack chains
-         */
-        public AsyncListProcessor(ScheduledExecutorService scheduler) {
-            this.scheduler = scheduler;
-        }
+        }, context, successRc, failureRc);
+    }
 
-        /**
-         * Process list of items
-         *
-         * @param data
-         *          List of data to process
-         * @param processor
-         *          Callback to process element of list when success
-         * @param finalCb
-         *          Final callback to be called after all elements in the list are processed
-         * @param context
-         *          Context of final callback
-         * @param successRc
-         *          RC passed to final callback on success
-         * @param failureRc
-         *          RC passed to final callback on failure
-         */
-        public void process(final List<T> data, final Processor<T> processor,
-                            final AsyncCallback.VoidCallback finalCb, final Object context,
-                            final int successRc, final int failureRc) {
-            if (data == null || data.size() == 0) {
-                finalCb.processResult(successRc, null, context);
-                return;
-            }
-            final int size = data.size();
-            final AtomicInteger current = new AtomicInteger(0);
-            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    if (rc != successRc) {
-                        // terminal immediately
-                        finalCb.processResult(failureRc, null, context);
-                        return;
-                    }
-                    // process next element
-                    int next = current.incrementAndGet();
-                    if (next >= size) { // reach the end of list
-                        finalCb.processResult(successRc, null, context);
-                        return;
-                    }
-                    final T dataToProcess = data.get(next);
-                    final AsyncCallback.VoidCallback stub = this;
-                    scheduler.submit(new Runnable() {
-                        @Override
-                        public final void run() {
-                            processor.process(dataToProcess, stub);
-                        }
-                    });
-                }
-            };
-            T firstElement = data.get(0);
-            processor.process(firstElement, stubCallback);
-        }
+    @Override
+    protected String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getHybridHierarchicalLedgerPath(ledgerId);
     }
 
     @Override
-    protected boolean isSpecialZnode(String znode) {
-        return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
+    protected long getLedgerId(String ledgerPath) throws IOException {
+        // TODO Auto-generated method stub
+        if (!ledgerPath.startsWith(ledgerRootPath)) {
+            throw new IOException("it is not a valid hashed path name : " + ledgerPath);
+        }
+        String hierarchicalPath = ledgerPath.substring(ledgerRootPath.length() + 1);
+        return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
     }
 
     @Override
     public LedgerRangeIterator getLedgerRanges() {
-        return new HierarchicalLedgerRangeIterator();
+        LedgerRangeIterator legacyLedgerRangeIterator = legacyLM.getLedgerRanges();
+        LedgerRangeIterator longLedgerRangeIterator = longLM.getLedgerRanges();
+        return new HierarchicalLedgerRangeIterator(legacyLedgerRangeIterator, longLedgerRangeIterator);
     }
 
-    /**
-     * Iterator through each metadata bucket with hierarchical mode
-     */
-    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
-        private Iterator<String> l1NodesIter = null;
-        private Iterator<String> l2NodesIter = null;
-        private String curL1Nodes = "";
-        private boolean iteratorDone = false;
-        private LedgerRange nextRange = null;
+    private static class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
 
-        /**
-         * iterate next level1 znode
-         *
-         * @return false if have visited all level1 nodes
-         * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
-         */
-        private boolean nextL1Node() throws KeeperException, InterruptedException {
-            l2NodesIter = null;
-            while (l2NodesIter == null) {
-                if (l1NodesIter.hasNext()) {
-                    curL1Nodes = l1NodesIter.next();
-                } else {
-                    return false;
-                }
-                if (isSpecialZnode(curL1Nodes)) {
-                    continue;
-                }
-                List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
-                Collections.sort(l2Nodes);
-                l2NodesIter = l2Nodes.iterator();
-                if (!l2NodesIter.hasNext()) {
-                    l2NodesIter = null;
-                    continue;
-                }
-            }
-            return true;
-        }
+        LedgerRangeIterator legacyLedgerRangeIterator;
+        LedgerRangeIterator longLedgerRangeIterator;
 
-        synchronized private void preload() throws IOException {
-            while (nextRange == null && !iteratorDone) {
-                boolean hasMoreElements = false;
-                try {
-                    if (l1NodesIter == null) {
-                        List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
-                        Collections.sort(l1Nodes);
-                        l1NodesIter = l1Nodes.iterator();
-                        hasMoreElements = nextL1Node();
-                    } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
-                        hasMoreElements = nextL1Node();
-                    } else {
-                        hasMoreElements = true;
-                    }
-                } catch (KeeperException ke) {
-                    throw new IOException("Error preloading next range", ke);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw new IOException("Interrupted while preloading", ie);
-                }
-                if (hasMoreElements) {
-                    nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
-                    if (nextRange.size() == 0) {
-                        nextRange = null;
-                    }
-                } else {
-                    iteratorDone = true;
-                }
-            }
+        HierarchicalLedgerRangeIterator(LedgerRangeIterator legacyLedgerRangeIterator, LedgerRangeIterator longLedgerRangeIterator) {
+            this.legacyLedgerRangeIterator = legacyLedgerRangeIterator;
+            this.longLedgerRangeIterator = longLedgerRangeIterator;
         }
 
         @Override
-        synchronized public boolean hasNext() throws IOException {
-            preload();
-            return nextRange != null && !iteratorDone;
+        public boolean hasNext() throws IOException {
+            return legacyLedgerRangeIterator.hasNext() || longLedgerRangeIterator.hasNext();
         }
 
         @Override
-        synchronized public LedgerRange next() throws IOException {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
+        public LedgerRange next() throws IOException {
+            if(legacyLedgerRangeIterator.hasNext()) {
+                return legacyLedgerRangeIterator.next();
             }
-            LedgerRange r = nextRange;
-            nextRange = null;
-            return r;
+            return longLedgerRangeIterator.next();
         }
 
-        /**
-         * Get a single node level1/level2
-         *
-         * @param level1
-         *          1st level node name
-         * @param level2
-         *          2nd level node name
-         * @throws IOException
-         */
-        LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
-                throws IOException {
-            StringBuilder nodeBuilder = new StringBuilder();
-            nodeBuilder.append(ledgerRootPath).append("/")
-                       .append(level1).append("/").append(level2);
-            String nodePath = nodeBuilder.toString();
-            List<String> ledgerNodes = null;
-            try {
-                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
-            } catch (InterruptedException e) {
-                throw new IOException("Error when get child nodes from zk", e);
-            }
-            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node "
-                          + level1 + "/" + level2 + " : " + zkActiveLedgers);
-            }
-
-            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
-                                                          getEndLedgerIdByLevel(level1, level2), true));
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
index 084d73d..a74a633 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,89 +15,25 @@ package org.apache.bookkeeper.meta;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.bookkeeper.meta;
 
-import java.io.IOException;
 import java.util.List;
 
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.bookkeeper.replication.ReplicationException;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 
 /**
- * Hierarchical Ledger Manager Factory
+ * Legacy Hierarchical Ledger Manager Factory
  */
-public class HierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+public class HierarchicalLedgerManagerFactory extends LegacyHierarchicalLedgerManagerFactory {
 
     public static final String NAME = "hierarchical";
-    public static final int CUR_VERSION = 1;
-
-    AbstractConfiguration conf;
-    ZooKeeper zk;
-
-    @Override
-    public int getCurrentVersion() {
-        return CUR_VERSION;
-    }
-
-    @Override
-    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
-                                           final ZooKeeper zk,
-                                           final int factoryVersion)
-    throws IOException {
-        if (CUR_VERSION != factoryVersion) {
-            throw new IOException("Incompatible layout version found : "
-                                + factoryVersion);
-        }
-        this.conf = conf;
-        this.zk = zk;
-        return this;
-    }
-
-    @Override
-    public void uninitialize() throws IOException {
-        // since zookeeper instance is passed from outside
-        // we don't need to close it here
-    }
-
+    
     @Override
     public LedgerIdGenerator newLedgerIdGenerator() {
         List<ACL> zkAcls = ZkUtils.getACLs(conf);
-        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), HierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
-    }
-
-    @Override
-    public LedgerManager newLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
-    }
-
-    @Override
-    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
-        return new ZkLedgerUnderreplicationManager(conf, zk);
-    }
-
-    @Override
-    public void format(AbstractConfiguration conf, ZooKeeper zk)
-            throws InterruptedException, KeeperException, IOException {
-        HierarchicalLedgerManager ledgerManager = (HierarchicalLedgerManager) newLedgerManager();
-        try {
-            String ledgersRootPath = conf.getZkLedgersRootPath();
-            List<String> children = zk.getChildren(ledgersRootPath, false);
-            for (String child : children) {
-                if (ledgerManager.isSpecialZnode(child)) {
-                    continue;
-                }
-                ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
-            }
-        } finally {
-            ledgerManager.close();
-        }
-        // Delete and recreate the LAYOUT information.
-        super.format(conf, zk);
+        ZkLedgerIdGenerator subIdGenerator = new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+        return new LongZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LongHierarchicalLedgerManager.IDGEN_ZNODE, subIdGenerator, zkAcls);
     }
-
+    
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 76d1572..e7cfc4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -164,7 +164,8 @@ public abstract class LedgerManagerFactory {
 
         // handle V2 layout case
         if (factoryClass != null &&
-            !layout.getManagerFactoryClass().equals(factoryClass.getName())) {
+            !layout.getManagerFactoryClass().equals(factoryClass.getName()) &&
+            conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) { // Disable should ONLY happen during compatibility testing.
 
             throw new IOException("Configured layout " + factoryClass.getName()
                                 + " does not match existing layout "  + layout.getManagerFactoryClass());
@@ -210,6 +211,8 @@ public abstract class LedgerManagerFactory {
                     factoryClass = FlatLedgerManagerFactory.class;
                 } else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
                     factoryClass = HierarchicalLedgerManagerFactory.class;
+                } else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
+                    factoryClass = LongHierarchicalLedgerManagerFactory.class;
                 } else {
                     throw new IOException("Unknown ledger manager type: "
                             + lmType);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
new file mode 100644
index 0000000..8be23b2
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hierarchical Ledger Manager which manages ledger meta in zookeeper using 2-level hierarchical znodes.
+ *
+ * <p>
+ * LegacyHierarchicalLedgerManager splits the generated id into 3 parts (2-4-4):
+ * <pre>&lt;level1 (2 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;</pre>
+ * These 3 parts are used to form the actual ledger node path used to store ledger metadata:
+ * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
+ * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
+ * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
+ * errors during garbage collection due to lists of children that are too long.
+ */
+class LegacyHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
+
+    static final Logger LOG = LoggerFactory.getLogger(LegacyHierarchicalLedgerManager.class);
+
+    static final String IDGEN_ZNODE = "idgen";
+    private static final String MAX_ID_SUFFIX = "9999";
+    private static final String MIN_ID_SUFFIX = "0000";
+
+    private static final ThreadLocal<StringBuilder> threadLocalNodeBuilder = new ThreadLocal<StringBuilder>() {
+        @Override
+        protected StringBuilder initialValue() {
+            return new StringBuilder();
+        }
+    };
+    
+    /**
+     * Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     */
+    public LegacyHierarchicalLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        super(conf, zk);
+    }
+
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getShortHierarchicalLedgerPath(ledgerId);
+    }
+
+    @Override
+    public long getLedgerId(String pathName) throws IOException {
+        if (!pathName.startsWith(ledgerRootPath)) {
+            throw new IOException("it is not a valid hashed path name : " + pathName);
+        }
+        String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
+        return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
+    }
+
+    //
+    // Active Ledger Manager
+    //
+
+    /**
+     * Get the smallest cache id in a specified node /level1/level2
+     *
+     * @param level1
+     *          1st level node name
+     * @param level2
+     *          2nd level node name
+     * @return the smallest ledger id
+     */
+    private long getStartLedgerIdByLevel(String level1, String level2) throws IOException {
+        return getLedgerId(level1, level2, MIN_ID_SUFFIX);
+    }
+
+    /**
+     * Get the largest cache id in a specified node /level1/level2
+     *
+     * @param level1
+     *          1st level node name
+     * @param level2
+     *          2nd level node name
+     * @return the largest ledger id
+     */
+    private long getEndLedgerIdByLevel(String level1, String level2) throws IOException {
+        return getLedgerId(level1, level2, MAX_ID_SUFFIX);
+    }
+
+    @Override
+    public void asyncProcessLedgers(final Processor<Long> processor,
+                                    final AsyncCallback.VoidCallback finalCb, final Object context,
+                                    final int successRc, final int failureRc) {
+        // process 1st level nodes
+        asyncProcessLevelNodes(ledgerRootPath, new Processor<String>() {
+            @Override
+            public void process(final String l1Node, final AsyncCallback.VoidCallback cb1) {
+                if (isSpecialZnode(l1Node)) {
+                    cb1.processResult(successRc, null, context);
+                    return;
+                }
+                final String l1NodePath = ledgerRootPath + "/" + l1Node;
+                // process level1 path, after all children of level1 process
+                // it callback to continue processing next level1 node
+                asyncProcessLevelNodes(l1NodePath, new Processor<String>() {
+                    @Override
+                    public void process(String l2Node, AsyncCallback.VoidCallback cb2) {
+                        // process level1/level2 path
+                        String l2NodePath = ledgerRootPath + "/" + l1Node + "/" + l2Node;
+                        // process each ledger
+                        // after all ledger are processed, cb2 will be call to continue processing next level2 node
+                        asyncProcessLedgersInSingleNode(l2NodePath, processor, cb2,
+                                                        context, successRc, failureRc);
+                    }
+                }, cb1, context, successRc, failureRc);
+            }
+        }, finalCb, context, successRc, failureRc);
+    }
+
+    protected static boolean isSpecialZnode(String znode) {
+        return IDGEN_ZNODE.equals(znode) || LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode) || AbstractHierarchicalLedgerManager.isSpecialZnode(znode);
+    }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        return new HierarchicalLedgerRangeIterator();
+    }
+
+    /**
+     * Iterator through each metadata bucket with hierarchical mode
+     */
+    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+        private Iterator<String> l1NodesIter = null;
+        private Iterator<String> l2NodesIter = null;
+        private String curL1Nodes = "";
+        private boolean iteratorDone = false;
+        private LedgerRange nextRange = null;
+
+        /**
+         * iterate next level1 znode
+         *
+         * @return false if have visited all level1 nodes
+         * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+         */
+        private boolean nextL1Node() throws KeeperException, InterruptedException {
+            l2NodesIter = null;
+            while (l2NodesIter == null) {
+                if (l1NodesIter.hasNext()) {
+                    curL1Nodes = l1NodesIter.next();
+                } else {
+                    return false;
+                }
+                // Top level nodes are always exactly 2 digits long. (Don't pick up long hierarchical top level nodes)
+                if (isSpecialZnode(curL1Nodes) || curL1Nodes.length() > 2) {
+                    continue;
+                }
+                List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+                Collections.sort(l2Nodes);
+                l2NodesIter = l2Nodes.iterator();
+                if (!l2NodesIter.hasNext()) {
+                    l2NodesIter = null;
+                    continue;
+                }
+            }
+            return true;
+        }
+
+        synchronized private void preload() throws IOException {
+            while (nextRange == null && !iteratorDone) {
+                boolean hasMoreElements = false;
+                try {
+                    if (l1NodesIter == null) {
+                        List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
+                        Collections.sort(l1Nodes);
+                        l1NodesIter = l1Nodes.iterator();
+                        hasMoreElements = nextL1Node();
+                    } else if (l2NodesIter == null || !l2NodesIter.hasNext()) {
+                        hasMoreElements = nextL1Node();
+                    } else {
+                        hasMoreElements = true;
+                    }
+                } catch (KeeperException ke) {
+                    throw new IOException("Error preloading next range", ke);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException("Interrupted while preloading", ie);
+                }
+                if (hasMoreElements) {
+                    nextRange = getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+                    if (nextRange.size() == 0) {
+                        nextRange = null;
+                    }
+                } else {
+                    iteratorDone = true;
+                }
+            }
+        }
+
+        @Override
+        synchronized public boolean hasNext() throws IOException {
+            preload();
+            return nextRange != null && !iteratorDone;
+        }
+
+        @Override
+        synchronized public LedgerRange next() throws IOException {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            LedgerRange r = nextRange;
+            nextRange = null;
+            return r;
+        }
+
+        /**
+         * Get a single node level1/level2
+         *
+         * @param level1
+         *          1st level node name
+         * @param level2
+         *          2nd level node name
+         * @throws IOException
+         */
+        LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+                throws IOException {
+            StringBuilder nodeBuilder = threadLocalNodeBuilder.get();
+            nodeBuilder.setLength(0);
+            nodeBuilder.append(ledgerRootPath).append("/")
+                       .append(level1).append("/").append(level2);
+            String nodePath = nodeBuilder.toString();
+            List<String> ledgerNodes = null;
+            try {
+                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (InterruptedException e) {
+                throw new IOException("Error when get child nodes from zk", e);
+            }
+            NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("All active ledgers from ZK for hash node "
+                          + level1 + "/" + level2 + " : " + zkActiveLedgers);
+            }
+
+            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2), true,
+                                                          getEndLedgerIdByLevel(level1, level2), true));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
new file mode 100644
index 0000000..1ac4038
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -0,0 +1,100 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * Hierarchical Ledger Manager Factory
+ */
+public class LegacyHierarchicalLedgerManagerFactory extends LedgerManagerFactory {
+
+    public static final String NAME = "legacyhierarchical";
+    public static final int CUR_VERSION = 1;
+
+    AbstractConfiguration conf;
+    ZooKeeper zk;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(final AbstractConfiguration conf,
+                                           final ZooKeeper zk,
+                                           final int factoryVersion)
+    throws IOException {
+        if (CUR_VERSION != factoryVersion) {
+            throw new IOException("Incompatible layout version found : "
+                                + factoryVersion);
+        }
+        this.conf = conf;
+        this.zk = zk;
+        return this;
+    }
+
+    @Override
+    public void uninitialize() throws IOException {
+        // since zookeeper instance is passed from outside
+        // we don't need to close it here
+    }
+
+    @Override
+    public LedgerIdGenerator newLedgerIdGenerator() {
+        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        return new ZkLedgerIdGenerator(zk, conf.getZkLedgersRootPath(), LegacyHierarchicalLedgerManager.IDGEN_ZNODE, zkAcls);
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new HierarchicalLedgerManager(conf, zk);
+    }
+
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+        return new ZkLedgerUnderreplicationManager(conf, zk);
+    }
+
+    @Override
+    public void format(AbstractConfiguration conf, ZooKeeper zk)
+            throws InterruptedException, KeeperException, IOException {
+        String ledgersRootPath = conf.getZkLedgersRootPath();
+        List<String> children = zk.getChildren(ledgersRootPath, false);
+        for (String child : children) {
+            if (HierarchicalLedgerManager.isSpecialZnode(child)) {
+                continue;
+            }
+            ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
+        }
+        // Delete and recreate the LAYOUT information.
+        super.format(conf, zk);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/057af8db/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
index 990297f..f8f7546 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LongHierarchicalLedgerManager.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.TreeSet;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -37,33 +38,36 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 4-level hierarchical znodes.
+ * LongHierarchical Ledger Manager which manages ledger meta in zookeeper using 5-level hierarchical znodes.
  *
  * <p>
  * LongHierarchicalLedgerManager splits the generated id into 5 parts (3-4-4-4-4):
  *
  * <pre>
- * &lt;level1 (3 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;&lt;level4 (4 digits)&gt;
- * &lt;level5 (4 digits)&gt;
+ * &lt;level0 (3 digits)&gt;&lt;level1 (4 digits)&gt;&lt;level2 (4 digits)&gt;&lt;level3 (4 digits)&gt;
+ * &lt;level4 (4 digits)&gt;
  * </pre>
  *
  * These 5 parts are used to form the actual ledger node path used to store ledger metadata:
  *
  * <pre>
- * (ledgersRootPath) / level1 / level2 / level3 / level4 / L(level5)
+ * (ledgersRootPath) / level0 / level1 / level2 / level3 / L(level4)
  * </pre>
  *
  * E.g Ledger 0000000000000000001 is split into 5 parts <i>000</i>, <i>0000</i>, <i>0000</i>, <i>0000</i>, <i>0001</i>,
  * which is stored in <i>(ledgersRootPath)/000/0000/0000/0000/L0001</i>. So each znode could have at most 10000 ledgers,
  * which avoids errors during garbage collection due to lists of children that are too long.
+ *
  */
-class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
+class LongHierarchicalLedgerManager extends AbstractHierarchicalLedgerManager {
 
     static final Logger LOG = LoggerFactory.getLogger(LongHierarchicalLedgerManager.class);
 
+    static final String IDGEN_ZNODE = "idgen-long";
     private static final String MAX_ID_SUFFIX = "9999";
     private static final String MIN_ID_SUFFIX = "0000";
 
+
     /**
      * Constructor
      *
@@ -77,11 +81,6 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
     }
 
     @Override
-    public String getLedgerPath(long ledgerId) {
-        return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
-    }
-
-    @Override
     public long getLedgerId(String pathName) throws IOException {
         if (!pathName.startsWith(ledgerRootPath)) {
             throw new IOException("it is not a valid hashed path name : " + pathName);
@@ -89,54 +88,68 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
         String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
         return StringUtils.stringToLongHierarchicalLedgerId(hierarchicalPath);
     }
+    
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        return ledgerRootPath + StringUtils.getLongHierarchicalLedgerPath(ledgerId);
+    }
 
     //
     // Active Ledger Manager
     //
 
     /**
-     * Get the smallest cache id in a specified node /level1/level2/level3/level4
+     * Get the smallest cache id in a specified node /level0/level1/level2/level3
      *
-     * @param level1
+     * @param level0
      *            1st level node name
-     * @param level2
+     * @param level1
      *            2nd level node name
-     * @param level3
+     * @param level2
      *            3rd level node name
-     * @param level4
+     * @param level3
      *            4th level node name
      * @return the smallest ledger id
      */
-    private long getStartLedgerIdByLevel(String level1, String level2, String level3, String level4)
+    private long getStartLedgerIdByLevel(String level0, String level1, String level2, String level3)
             throws IOException {
-        return getLedgerId(level1, level2, level3, level4, MIN_ID_SUFFIX);
+        return getLedgerId(level0, level1, level2, level3, MIN_ID_SUFFIX);
     }
 
     /**
-     * Get the largest cache id in a specified node /level1/level2/level3/level4
+     * Get the largest cache id in a specified node /level0/level1/level2/level3
      *
-     * @param level1
+     * @param level0
      *            1st level node name
-     * @param level2
+     * @param level1
      *            2nd level node name
-     * @param level3
+     * @param level2
      *            3rd level node name
-     * @param level4
+     * @param level3
      *            4th level node name
      * @return the largest ledger id
      */
-    private long getEndLedgerIdByLevel(String level1, String level2, String level3, String level4) throws IOException {
-        return getLedgerId(level1, level2, level3, level4, MAX_ID_SUFFIX);
+    private long getEndLedgerIdByLevel(String level0, String level1, String level2, String level3) throws IOException {
+        return getLedgerId(level0, level1, level2, level3, MAX_ID_SUFFIX);
     }
 
     @Override
     public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
             final Object context, final int successRc, final int failureRc) {
+
+        // If it succeeds, proceed with our own recursive ledger processing for the 63-bit id ledgers
         asyncProcessLevelNodes(ledgerRootPath,
                 new RecursiveProcessor(0, ledgerRootPath, processor, context, successRc, failureRc), finalCb, context,
                 successRc, failureRc);
     }
 
+    protected static boolean isSpecialZnode(String znode) {
+        // Check nextnode length. All paths in long hierarchical format (3-4-4-4-4)
+        // are at least 3 characters long. This prevents picking up any old-style
+        // hierarchical paths (2-4-4)
+        return LegacyHierarchicalLedgerManager.isSpecialZnode(znode) || znode.length() < 3;
+    }
+    
     private class RecursiveProcessor implements Processor<String> {
         private final int level;
         private final String path;
@@ -167,7 +180,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         context, successRc, failureRc);
             } else {
                 // process each ledger after all ledger are processed, cb will be call to continue processing next
-                // level5 node
+                // level4 node
                 asyncProcessLedgersInSingleNode(nodePath, processor, cb, context, successRc, failureRc);
             }
         }
@@ -194,7 +207,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             curLevelNodes = new ArrayList<String>(Collections.nCopies(4, (String) null));
         }
 
-        private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
+        synchronized private void initialize(String path, int level) throws KeeperException, InterruptedException, IOException {
             List<String> levelNodes = zk.getChildren(path, null);
             Collections.sort(levelNodes);
             if (level == 0) {
@@ -217,6 +230,9 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
             String curLNode = curLevelNodes.get(level);
             if (curLNode != null) {
+                // Traverse down through levels 0-3
+                // The nextRange becomes a listing of the children
+                // in the level4 directory.
                 if (level != 3) {
                     String nextLevelPath = path + "/" + curLNode;
                     initialize(nextLevelPath, level + 1);
@@ -229,7 +245,13 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
         }
 
-        private boolean moveToNext(int level) throws KeeperException, InterruptedException {
+        private void clearHigherLevels(int level) {
+            for(int i = level+1; i < 4; i++) {
+                curLevelNodes.set(i, null);
+            }
+        }
+
+        synchronized private boolean moveToNext(int level) throws KeeperException, InterruptedException {
             Iterator<String> curLevelNodesIter = levelNodesIter.get(level);
             boolean movedToNextNode = false;
             if (level == 0) {
@@ -239,6 +261,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         continue;
                     } else {
                         curLevelNodes.set(level, nextNode);
+                        clearHigherLevels(level);
                         movedToNextNode = true;
                         break;
                     }
@@ -247,6 +270,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                 if (curLevelNodesIter.hasNext()) {
                     String nextNode = curLevelNodesIter.next();
                     curLevelNodes.set(level, nextNode);
+                    clearHigherLevels(level);
                     movedToNextNode = true;
                 } else {
                     movedToNextNode = moveToNext(level - 1);
@@ -261,6 +285,7 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
                         levelNodesIter.set(level, newCurLevelNodesIter);
                         if (newCurLevelNodesIter.hasNext()) {
                             curLevelNodes.set(level, newCurLevelNodesIter.next());
+                            clearHigherLevels(level);
                             movedToNextNode = true;
                         }
                     }
@@ -306,15 +331,15 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             return r;
         }
 
-        LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
-            String level1 = curLevelNodes.get(0);
-            String level2 = curLevelNodes.get(1);
-            String level3 = curLevelNodes.get(2);
-            String level4 = curLevelNodes.get(3);
+        private LedgerRange getLedgerRangeByLevel(List<String> curLevelNodes) throws IOException {
+            String level0 = curLevelNodes.get(0);
+            String level1 = curLevelNodes.get(1);
+            String level2 = curLevelNodes.get(2);
+            String level3 = curLevelNodes.get(3);
 
             StringBuilder nodeBuilder = new StringBuilder();
-            nodeBuilder.append(ledgerRootPath).append("/").append(level1).append("/").append(level2).append("/")
-                    .append(level3).append("/").append(level4);
+            nodeBuilder.append(ledgerRootPath).append("/").append(level0).append("/").append(level1).append("/")
+                    .append(level2).append("/").append(level3);
             String nodePath = nodeBuilder.toString();
             List<String> ledgerNodes = null;
             try {
@@ -324,11 +349,11 @@ class LongHierarchicalLedgerManager extends HierarchicalLedgerManager {
             }
             NavigableSet<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK for hash node " + level1 + "/" + level2 + "/" + level3 + "/"
-                        + level4 + " : " + zkActiveLedgers);
+                LOG.debug("All active ledgers from ZK for hash node " + level0 + "/" + level1 + "/" + level2 + "/"
+                        + level3 + " : " + zkActiveLedgers);
             }
-            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level1, level2, level3, level4), true,
-                    getEndLedgerIdByLevel(level1, level2, level3, level4), true));
+            return new LedgerRange(zkActiveLedgers.subSet(getStartLedgerIdByLevel(level0, level1, level2, level3), true,
+                    getEndLedgerIdByLevel(level0, level1, level2, level3), true));
         }
     }
 }