You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/28 19:25:27 UTC

svn commit: r1207495 [1/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apac...

Author: ivank
Date: Mon Nov 28 18:25:18 2011
New Revision: 1207495

URL: http://svn.apache.org/viewvc?rev=1207495&view=rev
Log:
BOOKKEEPER-39: Bookie server failed to restart because of too many ledgers (more than ~50,000 ledgers) (Sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalAsyncLedgerOpsTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieFailureTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieReadWriteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalBookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/LocalBookKeeper.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Main.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ConcurrentLedgerTest.java
    zookeeper/bookkeeper/trunk/doc/bookkeeperConfig.textile

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 28 18:25:18 2011
@@ -80,6 +80,8 @@ BUGFIXES:
 
   BOOKKEEPER-114: add a shutdown hook to shut down bookie server safely. (Sijie via ivank)
 
+  BOOKKEEPER-39: Bookie server failed to restart because of too many ledgers (more than ~50,000 ledgers) (Sijie via ivank)
+
  hedwig-server/
 
   BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/conf/bk_server.conf Mon Nov 28 18:25:18 2011
@@ -38,6 +38,16 @@ journalDirectory=/tmp/bk-txn
 # It is possible to run with a single disk, but performance will be significantly lower.
 ledgerDirectories=/tmp/bk-data
 
+# Ledger Manager Class
+# What kind of ledger manager is used to manage how ledgers are stored, managed
+# and garbage collected. Try to read 'BookKeeper Overview' for detail info.
+# ledgerManagerType=flat
+
+# Root zookeeper path to store ledger metadata
+# This parameter is used by zookeeper-based ledger manager as a root znode to
+# store all ledgers.
+# zkLedgersRootPath=/ledgers
+
 # Max file size of entry logger, in bytes
 # A new entry log file will be created when the old one reaches the file size limitation
 # logSizeLimit=2147483648

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Mon Nov 28 18:25:18 2011
@@ -43,20 +43,20 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 
-
-
 /**
  * Implements a bookie.
  *
@@ -78,6 +78,7 @@ public class Bookie extends Thread {
     final ServerConfiguration conf;
 
     final SyncThread syncThread;
+    final LedgerManager ledgerManager;
 
     /**
      * Current directory layout version. Increment this 
@@ -242,23 +243,28 @@ public class Bookie extends Thread {
         }
     }
 
-    public Bookie(ServerConfiguration conf) throws IOException {
+    public Bookie(ServerConfiguration conf) 
+            throws IOException, KeeperException, InterruptedException {
+        this.conf = conf;
         this.journalDirectory = conf.getJournalDir();
         this.ledgerDirectories = conf.getLedgerDirs();
-        this.conf = conf;
+        this.maxJournalSize = conf.getMaxJournalSize() * MB;
+        this.maxBackupJournals = conf.getMaxBackupJournals();
 
+        // check directory layouts
         checkDirectoryLayoutVersion(journalDirectory);
         for (File dir : ledgerDirectories) {
             checkDirectoryLayoutVersion(dir);
         }
 
-        this.maxJournalSize = conf.getMaxJournalSize() * MB;
-        this.maxBackupJournals = conf.getMaxBackupJournals();
+        // instantiate zookeeper client to initialize ledger manager
+        ZooKeeper newZk = instantiateZookeeperClient(conf.getZkServers());
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, newZk);
 
         syncThread = new SyncThread(conf);
         entryLogger = new EntryLogger(conf, this);
-        ledgerCache = new LedgerCache(conf);
-    
+        ledgerCache = new LedgerCache(conf, ledgerManager);
+
         lastLogMark.readLog();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Last Log Mark : " + lastLogMark);
@@ -332,7 +338,12 @@ public class Bookie extends Thread {
                 }
             }
         }
-        instantiateZookeeperClient(conf.getBookiePort(), conf.getZkServers());
+        // pass zookeeper instance here
+        // since GarbageCollector thread should only start after journal
+        // finished replay
+        this.zk = newZk;
+        // make the bookie available
+        registerBookie(conf.getBookiePort());
         setDaemon(true);
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
         start();
@@ -375,20 +386,29 @@ public class Bookie extends Thread {
         Collections.sort(logs);
         return logs;
     }
-    
+
     /**
      * Instantiate the ZooKeeper client for the Bookie.
      */
-    private void instantiateZookeeperClient(int port, String zkServers) throws IOException {
+    private ZooKeeper instantiateZookeeperClient(String zkServers) throws IOException {
         if (zkServers == null) {
             LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
-            zk = null;
             isZkExpired = false;
-            return;
+            return null;
         }
         int zkTimeout = conf.getZkTimeout();
         // Create the ZooKeeper client instance
-        zk = newZookeeper(zkServers, zkTimeout);
+        return newZookeeper(zkServers, zkTimeout);
+    }
+
+    /**
+     * Register as an available bookie
+     */
+    private void registerBookie(int port) throws IOException {
+        if (null == zk) {
+            // zookeeper instance is null, means not register itself to zk
+            return;
+        }
         // Create the ZK ephemeral node for this Bookie.
         try {
             zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
@@ -853,6 +873,8 @@ public class Bookie extends Thread {
         }
         // Shutdown the EntryLogger which has the GarbageCollector Thread running
         entryLogger.shutdown();
+        // close Ledger Manager
+        ledgerManager.close();
         // setting running to false here, so watch thread in bookie server know it only after bookie shut down
         running = false;
     }
@@ -983,8 +1005,8 @@ public class Bookie extends Thread {
      * @throws IOException
      * @throws InterruptedException
      */
-    public static void main(String[] args) throws IOException,
-        InterruptedException, BookieException {
+    public static void main(String[] args) 
+            throws IOException, InterruptedException, BookieException, KeeperException {
         Bookie b = new Bookie(new ServerConfiguration());
         CounterCallback cb = new CounterCallback();
         long start = System.currentTimeMillis();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java Mon Nov 28 18:25:18 2011
@@ -35,7 +35,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -43,9 +42,7 @@ import java.util.concurrent.ConcurrentMa
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
+import org.apache.bookkeeper.meta.LedgerManager;
 
 /**
  * This class manages the writing of the bookkeeper entries. All the new
@@ -77,11 +74,6 @@ public class EntryLogger {
     // this indicates that a write has happened since the last flush
     private volatile boolean somethingWritten = false;
 
-    // ZK ledgers related String constants
-    static final String LEDGERS_PATH = "/ledgers";
-    static final String LEDGER_NODE_PREFIX = "L";
-    static final String AVAILABLE_NODE = "available";
-
     // Maps entry log files to the set of ledgers that comprise the file.
     private ConcurrentMap<Long, ConcurrentHashMap<Long, Boolean>> entryLogs2LedgersMap = new ConcurrentHashMap<Long, ConcurrentHashMap<Long, Boolean>>();
     // This is the thread that garbage collects the entry logs that do not
@@ -146,105 +138,73 @@ public class EntryLogger {
                     }
                 }
                 // Initialization check. No need to run any logic if we are still starting up.
-                if (bookie.zk == null || entryLogs2LedgersMap.isEmpty() || bookie.ledgerCache == null
-                        || bookie.ledgerCache.activeLedgers == null) {
+                if (bookie.zk == null || entryLogs2LedgersMap.isEmpty() ||
+                    bookie.ledgerCache == null) {
                     continue;
                 }
-                // First sync ZK to make sure we're reading the latest active/available ledger nodes.
-                bookie.zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
-                    @Override
-                    public void processResult(int rc, String path, Object ctx) {
-                        if (rc != Code.OK.intValue()) {
-                            LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
-                                      .create(KeeperException.Code.get(rc), path));
-                            return;
-                        }
-                        // Sync has completed successfully so now we can poll ZK
-                        // and read in the latest set of active ledger nodes.
-                        List<String> ledgerNodes;
+
+                // gc inactive/deleted ledgers
+                doGcLedgers();
+
+                // gc entry logs
+                doGcEntryLogs();
+            }
+        }
+
+        /**
+         * Do garbage collection ledger index files
+         */
+        private void doGcLedgers() {
+            bookie.ledgerCache.activeLedgerManager.garbageCollectLedgers(
+            new LedgerManager.GarbageCollector() {
+                @Override
+                public void gc(long ledgerId) {
+                    try {
+                        bookie.ledgerCache.deleteLedger(ledgerId);
+                    } catch (IOException e) {
+                        LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+                    }
+                }
+            });
+        }
+
+        /**
+         * Garbage collect those entry loggers which are not associated with any active ledgers
+         */
+        private void doGcEntryLogs() {
+            // Loop through all of the entry logs and remove the non-active ledgers.
+            for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
+                ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
+                for (Long entryLogLedger : entryLogLedgers.keySet()) {
+                    // Remove the entry log ledger from the set if it isn't active.
+                    if (!bookie.ledgerCache.activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+                        entryLogLedgers.remove(entryLogLedger);
+                    }
+                }
+                if (entryLogLedgers.isEmpty()) {
+                    // This means the entry log is not associated with any active ledgers anymore.
+                    // We can remove this entry log file now.
+                    LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
+                    BufferedChannel bc = channels.remove(entryLogId);
+                    if (null != bc) {
+                        // close its underlying file channel, so it could be deleted really
                         try {
-                            ledgerNodes = bookie.zk.getChildren(LEDGERS_PATH, null);
-                        } catch (Exception e) {
-                            LOG.error("Error polling ZK for the available ledger nodes: ", e);
-                            // We should probably wait a certain amount of time before retrying in case of temporary issues.
-                            return;
-                        }
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Retrieved current set of ledger nodes: " + ledgerNodes);
-                        }
-                        // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
-                        HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
-                        for (String ledgerNode : ledgerNodes) {
-                            try {
-                                // The available node is also stored in this path so ignore that.
-                                // That node is the path for the set of available Bookie Servers.
-                                if (ledgerNode.equals(AVAILABLE_NODE))
-                                    continue;
-                                String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
-                                allActiveLedgers.add(Long.parseLong(parts[parts.length - 1]));
-                            } catch (NumberFormatException e) {
-                                LOG.error("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
-                                // This is a pretty bad error as it indicates a ledger node in ZK
-                                // has an incorrect format. For now just continue and consider
-                                // this as a non-existent ledger.
-                                continue;
-                            }
-                        }
-                        ConcurrentMap<Long, Boolean> curActiveLedgers = bookie.ledgerCache.activeLedgers;
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("All active ledgers from ZK: " + allActiveLedgers);
-                            LOG.debug("Current active ledgers from Bookie: " + curActiveLedgers.keySet());
-                        }
-                        // Remove any active ledgers that don't exist in ZK.
-                        for (Long ledger : curActiveLedgers.keySet()) {
-                            if (!allActiveLedgers.contains(ledger)) {
-                                // Remove it from the current active ledgers set and also from all
-                                // LedgerCache data references to the ledger, i.e. the physical ledger index file.
-                                LOG.info("Removing a non-active/deleted ledger: " + ledger);
-                                curActiveLedgers.remove(ledger);
-                                try {
-                                    bookie.ledgerCache.deleteLedger(ledger);
-                                } catch (IOException e) {
-                                    LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
-                                }
-                            }
+                            bc.getFileChannel().close();
+                        } catch (IOException ie) {
+                            LOG.warn("Exception while closing garbage collected entryLog file : ", ie);
                         }
-                        // Loop through all of the entry logs and remove the non-active ledgers.
-                        for (Long entryLogId : entryLogs2LedgersMap.keySet()) {
-                            ConcurrentHashMap<Long, Boolean> entryLogLedgers = entryLogs2LedgersMap.get(entryLogId);
-                            for (Long entryLogLedger : entryLogLedgers.keySet()) {
-                                // Remove the entry log ledger from the set if it isn't active.
-                                if (!bookie.ledgerCache.activeLedgers.containsKey(entryLogLedger)) {
-                                    entryLogLedgers.remove(entryLogLedger);
-                                }
-                            }
-                            if (entryLogLedgers.isEmpty()) {
-                                // This means the entry log is not associated with any active ledgers anymore.
-                                // We can remove this entry log file now.
-                                LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
-                                BufferedChannel bc = channels.remove(entryLogId);
-                                if (null != bc) {
-                                    // close its underlying file channel, so it could be deleted really
-                                    try {
-                                        bc.getFileChannel().close();
-                                    } catch (IOException ie) {
-                                        LOG.warn("Exception while closing garbage colected entryLog file : ", ie);
-                                    }
-                                }
-                                File entryLogFile;
-                                try {
-                                    entryLogFile = findFile(entryLogId);
-                                } catch (FileNotFoundException e) {
-                                    LOG.error("Trying to delete an entryLog file that could not be found: "
-                                              + entryLogId + ".log");
-                                    continue;
-                                }
-                                entryLogFile.delete();
-                                entryLogs2LedgersMap.remove(entryLogId);
-                            }
-                        }
-                    };
-                }, null);
+                    }
+                    File entryLogFile;
+                    try {
+                        entryLogFile = findFile(entryLogId);
+                    } catch (FileNotFoundException e) {
+                        LOG.error("Trying to delete an entryLog file that could not be found: "
+                                + entryLogId + ".log");
+                        continue;
+                    }
+                    entryLogFile.delete();
+                    entryLogs2LedgersMap.remove(entryLogId);
+                }
             }
         }
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java Mon Nov 28 18:25:18 2011
@@ -33,11 +33,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +49,7 @@ public class LedgerCache {
 
     final File ledgerDirectories[];
 
-    public LedgerCache(ServerConfiguration conf) {
+    public LedgerCache(ServerConfiguration conf, LedgerManager alm) {
         this.ledgerDirectories = conf.getLedgerDirs();
         this.openFileLimit = conf.getOpenFileLimit();
         this.pageSize = conf.getPageSize();
@@ -65,6 +63,7 @@ public class LedgerCache {
         }
         LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
         LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
+        activeLedgerManager = alm;
         // Retrieve all of the active ledgers.
         getActiveLedgers();
     }
@@ -82,8 +81,9 @@ public class LedgerCache {
 
     LinkedList<Long> openLedgers = new LinkedList<Long>();
 
-    // Stores the set of active (non-deleted) ledgers.
-    ConcurrentMap<Long, Boolean> activeLedgers = new ConcurrentHashMap<Long, Boolean>();
+    // Manage all active ledgers in LedgerManager
+    // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
+    final LedgerManager activeLedgerManager;
 
     final int openFileLimit;
     final int pageSize;
@@ -237,7 +237,7 @@ public class LedgerCache {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("New ledger index file created for ledgerId: " + ledger);
                     }
-                    activeLedgers.put(ledger, true);
+                    activeLedgerManager.addActiveLedger(ledger, true);
                 }
                 if (openLedgers.size() > openFileLimit) {
                     fileInfoCache.remove(openLedgers.removeFirst()).close();
@@ -505,16 +505,13 @@ public class LedgerCache {
                                 // We've found a ledger index file. The file name is the
                                 // HexString representation of the ledgerId.
                                 String ledgerIdInHex = index.getName().substring(0, index.getName().length() - 4);
-                                activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
+                                activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
                             }
                         }
                     }
                 }
             }
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Active ledgers found: " + activeLedgers);
-        }
     }
 
     /**
@@ -529,8 +526,8 @@ public class LedgerCache {
         fi.getFile().delete();
         fi.close();
 
-        // Remove it from the activeLedgers set
-        activeLedgers.remove(ledgerId);
+        // Remove it from the active ledger manager
+        activeLedgerManager.removeActiveLedger(ledgerId);
 
         // Now remove it from all the other lists and maps.
         // These data structures need to be synchronized first before removing entries.

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Mon Nov 28 18:25:18 2011
@@ -23,16 +23,15 @@ package org.apache.bookkeeper.client;
 
 import java.io.IOException;
 import java.util.concurrent.Executors;
-import java.util.EnumSet;
-import java.util.Set;
 
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.proto.BookieClient;
-import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,9 +77,11 @@ public class BookKeeper {
     OrderedSafeExecutor mainWorkerPool = new OrderedSafeExecutor(Runtime
             .getRuntime().availableProcessors());
 
+    // Ledger manager responsible for how to store ledger meta data
+    final LedgerManager ledgerManager;
+
     ClientConfiguration conf;
 
-    
     /**
      * Create a bookkeeper client. A zookeeper client and a client socket factory
      * will be instantiated as part of this constructor.
@@ -137,10 +138,12 @@ public class BookKeeper {
      * @param zk
      *          Zookeeper client instance connected to the zookeeper with which
      *          the bookies have registered
+     * @throws IOException
      * @throws InterruptedException
      * @throws KeeperException
      */
-    public BookKeeper(ClientConfiguration conf, ZooKeeper zk) throws InterruptedException, KeeperException {
+    public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
+        throws IOException, InterruptedException, KeeperException {
         this(conf, zk, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool()));
         ownChannelFactory = true;
@@ -158,11 +161,12 @@ public class BookKeeper {
      *          the bookies have registered
      * @param channelFactory
      *          A factory that will be used to create connections to the bookies
+     * @throws IOException
      * @throws InterruptedException
      * @throws KeeperException
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
-            throws InterruptedException, KeeperException {
+            throws IOException, InterruptedException, KeeperException {
         if (zk == null || channelFactory == null) {
             throw new NullPointerException();
         }
@@ -172,6 +176,12 @@ public class BookKeeper {
         bookieWatcher = new BookieWatcher(this);
         bookieWatcher.readBookiesBlocking();
         bookieClient = new BookieClient(conf, channelFactory, mainWorkerPool);
+        // intialize ledger meta manager
+        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, zk);
+    }
+
+    LedgerManager getLedgerManager() {
+        return ledgerManager;
     }
 
     /**
@@ -463,6 +473,7 @@ public class BookKeeper {
      */
     public void close() throws InterruptedException, BKException {
         bookieClient.close();
+        ledgerManager.close();
         bookieWatcher.halt();
         if (ownChannelFactory) {
             channelFactory.releaseExternalResources();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Mon Nov 28 18:25:18 2011
@@ -21,9 +21,6 @@ package org.apache.bookkeeper.client;
  *
  */
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -33,14 +30,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.slf4j.Logger;
@@ -65,10 +62,7 @@ public class BookKeeperAdmin {
     // ZK client instance
     private ZooKeeper zk;
     // ZK ledgers related String constants
-    static final String LEDGERS_PATH = "/ledgers";
-    static final String LEDGER_NODE_PREFIX = "L";
-    static final String AVAILABLE_NODE = "available";
-    static final String BOOKIES_PATH = LEDGERS_PATH + "/" + AVAILABLE_NODE;
+    static final String BOOKIES_PATH = BookieWatcher.BOOKIE_REGISTRATION_PATH;
 
     // BookKeeper client instance
     private BookKeeper bkc;
@@ -161,52 +155,6 @@ public class BookKeeperAdmin {
     }
 
     /**
-     * This is a multi callback object for bookie recovery that waits for all of
-     * the multiple async operations to complete. If any fail, then we invoke
-     * the final callback with a BK LedgerRecoveryException.
-     */
-    class MultiCallback implements AsyncCallback.VoidCallback {
-        // Number of expected callbacks
-        final int expected;
-        // Final callback and the corresponding context to invoke
-        final AsyncCallback.VoidCallback cb;
-        final Object context;
-        // This keeps track of how many operations have completed
-        final AtomicInteger done = new AtomicInteger();
-        // List of the exceptions from operations that completed unsuccessfully
-        final LinkedBlockingQueue<Integer> exceptions = new LinkedBlockingQueue<Integer>();
-
-        MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) {
-            this.expected = expected;
-            this.cb = cb;
-            this.context = context;
-            if (expected == 0) {
-                cb.processResult(Code.OK.intValue(), null, context);
-            }
-        }
-
-        private void tick() {
-            if (done.incrementAndGet() == expected) {
-                if (exceptions.isEmpty()) {
-                    cb.processResult(Code.OK.intValue(), null, context);
-                } else {
-                    cb.processResult(BKException.Code.LedgerRecoveryException, null, context);
-                }
-            }
-        }
-
-        @Override
-        public void processResult(int rc, String path, Object ctx) {
-            if (rc != Code.OK.intValue()) {
-                LOG.error("BK error recovering ledger data", BKException.create(rc));
-                exceptions.add(rc);
-            }
-            tick();
-        }
-
-    }
-
-    /**
      * Method to get the input ledger's digest type. For now, this is just a
      * placeholder function since there is no way we can get this information
      * easily. In the future, BookKeeper should store this ledger metadata
@@ -316,8 +264,8 @@ public class BookKeeperAdmin {
      */
     public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
                                        final RecoverCallback cb, final Object context) {
-        // Sync ZK to make sure we're reading the latest bookie/ledger data.
-        zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
+        // Sync ZK to make sure we're reading the latest bookie data.
+        zk.sync(BOOKIES_PATH, new AsyncCallback.VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 if (rc != Code.OK.intValue()) {
@@ -407,36 +355,30 @@ public class BookKeeperAdmin {
      */
     private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
                                   final RecoverCallback cb, final Object context, final List<InetSocketAddress> availableBookies) {
-        zk.getChildren(LEDGERS_PATH, null, new AsyncCallback.ChildrenCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, List<String> children) {
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("ZK error getting ledger nodes: ", KeeperException.create(KeeperException.Code.get(rc),
-                              path));
-                    cb.recoverComplete(BKException.Code.ZKException, context);
-                    return;
-                }
-                // Wrapper class around the RecoverCallback so it can be used
-                // as the final VoidCallback to invoke within the MultiCallback.
-                class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
-                    final RecoverCallback cb;
+        // Wrapper class around the RecoverCallback so it can be used
+        // as the final VoidCallback to process ledgers
+        class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
+            final RecoverCallback cb;
 
-                    RecoverCallbackWrapper(RecoverCallback cb) {
-                        this.cb = cb;
-                    }
+            RecoverCallbackWrapper(RecoverCallback cb) {
+                this.cb = cb;
+            }
 
-                    @Override
-                    public void processResult(int rc, String path, Object ctx) {
-                        cb.recoverComplete(rc, ctx);
-                    }
-                }
-                // Recover each of the ledgers asynchronously
-                MultiCallback ledgerMcb = new MultiCallback(children.size(), new RecoverCallbackWrapper(cb), context);
-                for (final String ledgerNode : children) {
-                    recoverLedger(bookieSrc, ledgerNode, ledgerMcb, availableBookies);
-                }
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                cb.recoverComplete(rc, ctx);
             }
-        }, null);
+        }
+
+        Processor<Long> ledgerProcessor = new Processor<Long>() {
+            @Override
+            public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
+                recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
+            }
+        };
+        bkc.getLedgerManager().asyncProcessLedgers(
+            ledgerProcessor, new RecoverCallbackWrapper(cb),
+            context, BKException.Code.OK, BKException.Code.LedgerRecoveryException);
     }
 
     /**
@@ -462,11 +404,10 @@ public class BookKeeperAdmin {
      * @param bookieSrc
      *            Source bookie that had a failure. We want to replicate the
      *            ledger fragments that were stored there.
-     * @param ledgerNode
-     *            Ledger Node name as retrieved from ZooKeeper we want to
-     *            recover.
-     * @param ledgerMcb
-     *            MultiCallback to invoke once we've recovered the current
+     * @param lId
+     *            Ledger id we want to recover.
+     * @param ledgerIterCb
+     *            IterationCallback to invoke once we've recovered the current
      *            ledger.
      * @param availableBookies
      *            List of Bookie Servers that are available to use for
@@ -474,30 +415,10 @@ public class BookKeeperAdmin {
      *            single bookie server if the user explicitly chose a bookie
      *            server to replicate data to.
      */
-    private void recoverLedger(final InetSocketAddress bookieSrc, final String ledgerNode,
-                               final MultiCallback ledgerMcb, final List<InetSocketAddress> availableBookies) {
-        /*
-         * The available node is also stored in this path so ignore that. That
-         * node is the path for the set of available Bookie Servers.
-         */
-        if (ledgerNode.equals(AVAILABLE_NODE)) {
-            ledgerMcb.processResult(BKException.Code.OK, null, null);
-            return;
-        }
-        // Parse out the ledgerId from the ZK ledger node.
-        String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
-        if (parts.length < 2) {
-            LOG.error("Ledger Node retrieved from ZK has invalid name format: " + ledgerNode);
-            ledgerMcb.processResult(BKException.Code.ZKException, null, null);
-            return;
-        }
-        final long lId;
-        try {
-            lId = Long.parseLong(parts[parts.length - 1]);
-        } catch (NumberFormatException e) {
-            LOG.error("Error retrieving ledgerId from ledgerNode: " + ledgerNode, e);
-            ledgerMcb.processResult(BKException.Code.ZKException, null, null);
-            return;
+    private void recoverLedger(final InetSocketAddress bookieSrc, final long lId,
+                               final AsyncCallback.VoidCallback ledgerIterCb, final List<InetSocketAddress> availableBookies) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Recovering ledger : " + lId);
         }
         /*
          * For the current ledger, open it to retrieve the LedgerHandle. This
@@ -512,7 +433,7 @@ public class BookKeeperAdmin {
             public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
                 if (rc != Code.OK.intValue()) {
                     LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
-                    ledgerMcb.processResult(rc, null, null);
+                    ledgerIterCb.processResult(rc, null, null);
                     return;
                 }
                 /*
@@ -550,17 +471,17 @@ public class BookKeeperAdmin {
                  * multiCallback and return.
                  */
                 if (ledgerFragmentsToRecover.size() == 0) {
-                    ledgerMcb.processResult(BKException.Code.OK, null, null);
+                    ledgerIterCb.processResult(BKException.Code.OK, null, null);
                     return;
                 }
 
                 /*
                  * Multicallback for ledger. Once all fragments for the ledger have been recovered
-                 * trigger the ledgerMcb 
+                 * trigger the ledgerIterCb
                  */
-                MultiCallback ledgerFragmentsMcb 
-                    = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerMcb, null);
-
+                MultiCallback ledgerFragmentsMcb
+                    = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerIterCb, null,
+                                        BKException.Code.OK, BKException.Code.LedgerRecoveryException);
                 /*
                  * Now recover all of the necessary ledger fragments
                  * asynchronously using a MultiCallback for every fragment.
@@ -657,7 +578,9 @@ public class BookKeeperAdmin {
          * Now asynchronously replicate all of the entries for the ledger
          * fragment that were on the dead bookie.
          */
-        MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), cb, null);
+        MultiCallback ledgerFragmentEntryMcb =
+            new MultiCallback(entriesToReplicate.size(), cb, null,
+                              BKException.Code.OK, BKException.Code.LedgerRecoveryException);
         for (final Long entryId : entriesToReplicate) {
             recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java Mon Nov 28 18:25:18 2011
@@ -28,21 +28,18 @@ import java.util.ArrayList;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 
 /**
  * Encapsulates asynchronous ledger create operation
  *
  */
-class LedgerCreateOp implements StringCallback, StatCallback {
+class LedgerCreateOp implements GenericCallback<String>, StatCallback {
 
     static final Logger LOG = LoggerFactory.getLogger(LedgerCreateOp.class);
 
@@ -86,27 +83,18 @@ class LedgerCreateOp implements StringCa
      * Initiates the operation
      */
     public void initiate() {
-        /*
-         * Create ledger node on ZK. We get the id from the sequence number on
-         * the node.
-         */
-
-        bk.getZkHandle().create(StringUtils.prefix, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                                CreateMode.PERSISTENT_SEQUENTIAL, this, null);
-
-        // calls the children callback method below
+        bk.getLedgerManager().newLedgerPath(this);
     }
 
-
     /**
-     * Implements ZooKeeper string callback.
-     *
-     * @see org.apache.zookeeper.AsyncCallback.StringCallback#processResult(int, java.lang.String, java.lang.Object, java.lang.String)
+     * Callback when created ledger path.
      */
-    public void processResult(int rc, String path, Object ctx, String name) {
+    @Override
+    public void operationComplete(int rc, String ledgerPath) {
 
         if (rc != KeeperException.Code.OK.intValue()) {
-            LOG.error("Could not create node for ledger", KeeperException.create(KeeperException.Code.get(rc), path));
+            LOG.error("Could not create node for ledger",
+                      KeeperException.create(KeeperException.Code.get(rc), ledgerPath));
             cb.createComplete(BKException.Code.ZKException, null, this.ctx);
             return;
         }
@@ -116,9 +104,9 @@ class LedgerCreateOp implements StringCa
          */
         long ledgerId;
         try {
-            ledgerId = StringUtils.getLedgerId(name);
+            ledgerId = bk.getLedgerManager().getLedgerId(ledgerPath);
         } catch (IOException e) {
-            LOG.error("Could not extract ledger-id from path:" + path, e);
+            LOG.error("Could not extract ledger-id from path:" + ledgerPath, e);
             cb.createComplete(BKException.Code.ZKException, null, this.ctx);
             return;
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Nov 28 18:25:18 2011
@@ -22,7 +22,6 @@
 package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
-import org.apache.bookkeeper.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -65,7 +64,8 @@ class LedgerDeleteOp implements VoidCall
     public void initiate() {
         // Asynchronously delete the ledger node in ZK.
         // When this completes, it will invoke the callback method below.
-        bk.getZkHandle().delete(StringUtils.getLedgerNodePath(ledgerId), -1, this, null);
+
+        bk.getZkHandle().delete(bk.getLedgerManager().getLedgerPath(ledgerId), -1, this, null);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Mon Nov 28 18:25:18 2011
@@ -29,7 +29,6 @@ import java.util.Enumeration;
 import java.util.Queue;
 import java.util.concurrent.Semaphore;
 
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -40,7 +39,6 @@ import org.apache.bookkeeper.client.Book
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
-import org.apache.bookkeeper.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -196,9 +194,9 @@ public class LedgerHandle {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Writing metadata to ZooKeeper: " + this.ledgerId + ", " + metadata.getZnodeVersion());
         }
-        
-        bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
-                                 metadata.serialize(), metadata.getZnodeVersion(), 
+
+        bk.getZkHandle().setData(bk.getLedgerManager().getLedgerPath(ledgerId),
+                                 metadata.serialize(), metadata.getZnodeVersion(),
                                  callback, ctx);
     }
 
@@ -609,11 +607,11 @@ public class LedgerHandle {
         }, null);
 
     }
-    
+
     void rereadMetadata(final GenericCallback<Void> cb) {
-        bk.getZkHandle().getData(StringUtils.getLedgerNodePath(ledgerId), false, 
+        bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false,
                 new DataCallback() {
-                    public void processResult(int rc, String path, 
+                    public void processResult(int rc, String path,
                                               Object ctx, byte[] data, Stat stat) {
                         if (rc != KeeperException.Code.OK.intValue()) {
                             LOG.error("Error reading metadata from ledger, code =" + rc);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java Mon Nov 28 18:25:18 2011
@@ -26,7 +26,6 @@ import java.security.GeneralSecurityExce
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
@@ -80,7 +79,7 @@ class LedgerOpenOp implements DataCallba
          * Asynchronously read the ledger metadata node.
          */
 
-        bk.getZkHandle().getData(StringUtils.getLedgerNodePath(ledgerId), false, this, ctx);
+        bk.getZkHandle().getData(bk.getLedgerManager().getLedgerPath(ledgerId), false, this, ctx);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1207495&r1=1207494&r2=1207495&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Mon Nov 28 18:25:18 2011
@@ -25,10 +25,16 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.configuration.SystemConfiguration;
 
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+
 /**
  * Abstract configuration
  */
-public abstract class AbstractConfiguration extends CompositeConfiguration { 
+public abstract class AbstractConfiguration extends CompositeConfiguration {
+
+    // Ledger Manager
+    protected final static String LEDGER_MANAGER_TYPE = "ledgerManagerType";
+    protected final static String ZK_LEDGERS_ROOT_PATH = "zkLedgersRootPath";
 
     protected AbstractConfiguration() {
         super();
@@ -58,4 +64,42 @@ public abstract class AbstractConfigurat
         addConfiguration(baseConf); 
     }
 
+    /**
+     * Set Ledger Manager Type.
+     *
+     * @param lmType
+     *          Ledger Manager Type
+     * @return void
+     */
+    public void setLedgerManagerType(String lmType) {
+        setProperty(LEDGER_MANAGER_TYPE, lmType); 
+    }
+
+    /**
+     * Get Ledger Manager Type.
+     *
+     * @return ledger manager type
+     * @throws ConfigurationException
+     */
+    public String getLedgerManagerType() {
+        return getString(LEDGER_MANAGER_TYPE);
+    }
+
+    /**
+     * Set Zk Ledgers Root Path.
+     *
+     * @param zkLedgersPath zk ledgers root path
+     */
+    public void setZkLedgersRootPath(String zkLedgersPath) {
+        setProperty(ZK_LEDGERS_ROOT_PATH, zkLedgersPath);
+    }
+
+    /**
+     * Get Zk Ledgers Root Path.
+     *
+     * @return zk ledgers root path
+     */
+    public String getZkLedgersRootPath() {
+        return getString(ZK_LEDGERS_ROOT_PATH, "/ledgers");
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1207495&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Mon Nov 28 18:25:18 2011
@@ -0,0 +1,251 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
+ */
+abstract class AbstractZkLedgerManager implements LedgerManager {
+
+    static Logger LOG = Logger.getLogger(AbstractZkLedgerManager.class);
+
+    // Ledger Node Prefix
+    static public final String LEDGER_NODE_PREFIX = "L";
+    static final String AVAILABLE_NODE = "available";
+
+    protected final AbstractConfiguration conf;
+    protected final ZooKeeper zk;
+    protected final String ledgerRootPath;
+
+    /**
+     * ZooKeeper-based Ledger Manager Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     * @param ledgerRootPath
+     *          ZooKeeper Path to store ledger metadata
+     */
+    protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+                                      String ledgerRootPath) {
+        this.conf = conf;
+        this.zk = zk;
+        this.ledgerRootPath = ledgerRootPath;
+    }
+
+    /**
+     * Get all the ledgers in a single zk node
+     *
+     * @param nodePath
+     *          Zookeeper node path
+     * @param getLedgersCallback
+     *          callback function to process ledgers in a single node
+     */
+    protected void asyncGetLedgersInSingleNode(final String nodePath, final GenericCallback<HashSet<Long>> getLedgersCallback) {
+        // First sync ZK to make sure we're reading the latest active/available ledger nodes.
+        zk.sync(nodePath, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sync node path " + path + " return : " + rc);
+                }
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
+                            .create(KeeperException.Code.get(rc), path));
+                    getLedgersCallback.operationComplete(rc, null);
+                    return;
+                }
+                // Sync has completed successfully so now we can poll ZK
+                // and read in the latest set of active ledger nodes.
+                doAsyncGetLedgersInSingleNode(nodePath, getLedgersCallback);
+            }
+        }, null);
+    }
+
+    private void doAsyncGetLedgersInSingleNode(final String nodePath,
+                                               final GenericCallback<HashSet<Long>> getLedgersCallback) {
+        zk.getChildren(nodePath, false, new AsyncCallback.ChildrenCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, List<String> ledgerNodes) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("Error polling ZK for the available ledger nodes: ", KeeperException
+                            .create(KeeperException.Code.get(rc), path));
+                    getLedgersCallback.operationComplete(rc, null);
+                    return;
+                }
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Retrieved current set of ledger nodes: " + ledgerNodes);
+                }
+                // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
+                HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
+                for (String ledgerNode : ledgerNodes) {
+                    if (isSpecialZnode(ledgerNode)) {
+                        continue;
+                    }
+                    try {
+                        // convert the node path to ledger id according to different ledger manager implementation
+                        allActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
+                    } catch (IOException ie) {
+                        LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
+                        // This is a pretty bad error as it indicates a ledger node in ZK
+                        // has an incorrect format. For now just continue and consider
+                        // this as a non-existent ledger.
+                        continue;
+                    }
+                }
+
+                getLedgersCallback.operationComplete(rc, allActiveLedgers);
+
+            }
+        }, null);
+    }
+
+    private class GetLedgersCtx {
+        int rc;
+        HashSet<Long> ledgers = null;
+    }
+
+    /**
+     * Get all the ledgers in a single zk node
+     *
+     * @param nodePath
+     *          Zookeeper node path
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected HashSet<Long> getLedgersInSingleNode(final String nodePath)
+        throws IOException, InterruptedException {
+        final GetLedgersCtx ctx = new GetLedgersCtx();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Try to get ledgers of node : " + nodePath);
+        }
+        synchronized (ctx) {
+            asyncGetLedgersInSingleNode(nodePath, new GenericCallback<HashSet<Long>>() {
+                @Override
+                public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+                    synchronized (ctx) {
+                        if (Code.OK.intValue() == rc) {
+                            ctx.ledgers = zkActiveLedgers;
+                        }
+                        ctx.rc = rc;
+                        ctx.notifyAll();
+                    }
+                }
+            });
+            ctx.wait();
+        }
+        if (Code.OK.intValue() != ctx.rc && null != ctx.ledgers) {
+            throw new IOException("Error on getting ledgers from node " + nodePath);
+        }
+        return ctx.ledgers;
+    }
+
+    /**
+     * Process ledgers in a single zk node.
+     *
+     * <p>
+     * for each ledger found in this zk node, processor#process(ledgerId) will be triggerred
+     * to process a specific ledger. after all ledgers has been processed, the finalCb will
+     * be called with provided context object. The RC passed to finalCb is decided by :
+     * <ul>
+     * <li> All ledgers are processed successfully, successRc will be passed.
+     * <li> Either ledger is processed failed, failureRc will be passed.
+     * </ul>
+     * </p>
+     *
+     * @param path
+     *          Zk node path to store ledgers
+     * @param processor
+     *          Processor provided to process ledger
+     * @param finalCb
+     *          Callback object when all ledgers are processed
+     * @param ctx
+     *          Context object passed to finalCb
+     * @param successRc
+     *          RC passed to finalCb when all ledgers are processed successfully
+     * @param failureRc
+     *          RC passed to finalCb when either ledger is processed failed
+     */
+    protected void asyncProcessLedgersInSingleNode(
+            final String path, final Processor<Long> processor,
+            final AsyncCallback.VoidCallback finalCb, final Object ctx,
+            final int successRc, final int failureRc) {
+        asyncGetLedgersInSingleNode(path, new GenericCallback<HashSet<Long>>() {
+            @Override
+            public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+                if (Code.OK.intValue() != rc) {
+                    finalCb.processResult(failureRc, null, ctx);
+                    return;
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Processing ledgers : " + zkActiveLedgers);
+                }
+
+                // no ledgers found, return directly
+                if (zkActiveLedgers.size() == 0) {
+                    finalCb.processResult(successRc, null, ctx);
+                    return;
+                }
+
+                MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(), finalCb, ctx,
+                                                      successRc, failureRc);
+                // start loop over all ledgers
+                for (Long ledger : zkActiveLedgers) {
+                    processor.process(ledger, mcb);
+                }
+            }
+        });
+    }
+
+    /**
+     * Whether the znode a special znode
+     *
+     * @param znode
+     *          Znode Name
+     * @return true  if the znode is a special znode otherwise false
+     */
+    protected boolean isSpecialZnode(String znode) {
+        if (AVAILABLE_NODE.equals(znode) 
+            || LedgerLayout.LAYOUT_ZNODE.equals(znode)) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public void close() {
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1207495&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Mon Nov 28 18:25:18 2011
@@ -0,0 +1,193 @@
+package org.apache.bookkeeper.meta;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashSet;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage all ledgers in a single zk node.
+ *
+ * <p>
+ * All ledgers' metadata are put in a single zk node, created using zk sequential node.
+ * Each ledger node is prefixed with 'L'.
+ * </p>
+ * <p>
+ * All actived ledgers found in bookie server side is managed in a hash map.
+ * </p>
+ * <p>
+ * Garbage collection in FlatLedgerManager is procssed as below:
+ * <ul>
+ * <li> fetch all existed ledgers from zookeeper, said <b>zkActiveLedgers</b>
+ * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
+ * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in
+ * <b>zkActiveLedgers</b>, do garbage collection on them.
+ * </ul>
+ * </p>
+ */
+class FlatLedgerManager extends AbstractZkLedgerManager {
+
+    static final Logger LOG = LoggerFactory.getLogger(FlatLedgerManager.class);
+    public static final String NAME = "flat";
+    public static final int CUR_VERSION = 1;
+
+    // path prefix to store ledger znodes
+    private final String ledgerPrefix;
+    // hash map to store all active ledger ids
+    private ConcurrentMap<Long, Boolean> activeLedgers;
+
+    /**
+     * Constructor
+     *
+     * @param conf
+     *          Configuration object
+     * @param zk
+     *          ZooKeeper Client Handle
+     * @param ledgerRootPath
+     *          ZooKeeper Path to store ledger metadata
+     * @throws IOException when version is not compatible
+     */
+    public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk,
+                             String ledgerRootPath, int layoutVersion)
+        throws IOException {
+        super(conf, zk, ledgerRootPath);
+
+        if (layoutVersion != CUR_VERSION) {
+            throw new IOException("Incompatible layout version found : " 
+                                  + layoutVersion);
+        }
+
+        ledgerPrefix = ledgerRootPath + "/" + LEDGER_NODE_PREFIX;
+        activeLedgers = new ConcurrentHashMap<Long, Boolean>();
+    }
+
+    @Override
+    public void newLedgerPath(final GenericCallback<String> cb) {
+        StringCallback scb = new StringCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx,
+                    String name) {
+                if (Code.OK.intValue() != rc) {
+                    cb.operationComplete(rc, null);
+                } else {
+                    cb.operationComplete(rc, name);
+                }
+            }
+        };
+        ZkUtils.createFullPathOptimistic(zk, ledgerPrefix, new byte[0],
+            Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, scb, null);
+    }
+
+    @Override
+    public String getLedgerPath(long ledgerId) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(ledgerPrefix)
+          .append(StringUtils.getZKStringId(ledgerId));
+        return sb.toString();
+    }
+
+    @Override
+    public long getLedgerId(String nodeName) throws IOException {
+        long ledgerId;
+        try {
+            String parts[] = nodeName.split(ledgerPrefix);
+            ledgerId = Long.parseLong(parts[parts.length - 1]);
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+        return ledgerId;
+    }
+
+    @Override
+    public void asyncProcessLedgers(final Processor<Long> processor,
+                                    final AsyncCallback.VoidCallback finalCb, final Object ctx,
+                                    final int successRc, final int failureRc) {
+        asyncProcessLedgersInSingleNode(ledgerRootPath, processor, finalCb, ctx, successRc, failureRc);
+    }
+
+    @Override
+    public void addActiveLedger(long ledgerId, boolean active) {
+        activeLedgers.put(ledgerId, active);
+    }
+
+    @Override
+    public void removeActiveLedger(long ledgerId) {
+        activeLedgers.remove(ledgerId);
+    }
+
+    @Override
+    public boolean containsActiveLedger(long ledgerId) {
+        return activeLedgers.containsKey(ledgerId);
+    }
+
+    @Override
+    public void garbageCollectLedgers(GarbageCollector gc) {
+        try {
+            HashSet<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
+            ConcurrentMap<Long, Boolean> bkActiveLedgers = activeLedgers;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("All active ledgers from ZK: " + zkActiveLedgers);
+                LOG.debug("Current active ledgers from Bookie: " + bkActiveLedgers.keySet());
+            }
+            doGc(gc, bkActiveLedgers, zkActiveLedgers);
+        } catch (IOException ie) {
+            LOG.warn("Error during garbage collecting ledgers from " + ledgerRootPath, ie);
+        } catch (InterruptedException inte) {
+            LOG.warn("Interrupted during garbage collecting ledgers from " + ledgerRootPath, inte);
+        }
+    }
+
+    /**
+     * Do garbage collecting comparing hosted ledgers and zk ledgers
+     *
+     * @param gc
+     *          Garbage collector to do garbage collection when found inactive/deleted ledgers
+     * @param bkActiveLedgers
+     *          Active ledgers hosted in bookie server
+     * @param zkAllLedgers
+     *          All ledgers stored in zookeeper
+     */
+    void doGc(GarbageCollector gc, ConcurrentMap<Long, Boolean> bkActiveLedgers, HashSet<Long> zkAllLedgers) {
+        // remove any active ledgers that doesn't exist in zk
+        for (Long bkLid : bkActiveLedgers.keySet()) {
+            if (!zkAllLedgers.contains(bkLid)) {
+                // remove it from current active ledger
+                bkActiveLedgers.remove(bkLid);
+                gc.gc(bkLid);
+            }
+        }
+    }
+
+}