You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/12/24 05:50:03 UTC

svn commit: r1425585 [1/2] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ book...

Author: sijie
Date: Mon Dec 24 04:50:02 2012
New Revision: 1425585

URL: http://svn.apache.org/viewvc?rev=1425585&view=rev
Log:
BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC algorithm. (Fangmin, ivank, fpj via sijie)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java
Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/SnapshotMap.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Dec 24 04:50:02 2012
@@ -270,6 +270,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-490: add documentation for MetaStore interface (sijie, ivank via sijie)
 
+        BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC algorithm. (Fangmin, ivank, fpj via sijie)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Mon Dec 24 04:50:02 2012
@@ -39,7 +39,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -82,8 +82,8 @@ public class Bookie extends Thread {
     final ServerConfiguration conf;
 
     final SyncThread syncThread;
-    final LedgerManagerFactory activeLedgerManagerFactory;
-    final ActiveLedgerManager activeLedgerManager;
+    final LedgerManagerFactory ledgerManagerFactory;
+    final LedgerManager ledgerManager;
     final LedgerStorage ledgerStorage;
     final Journal journal;
 
@@ -478,17 +478,14 @@ public class Bookie extends Thread {
         this.conf = conf;
         this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
         this.ledgerDirsManager = new LedgerDirsManager(conf);
-
         // instantiate zookeeper client to initialize ledger manager
         this.zk = instantiateZookeeperClient(conf);
         checkEnvironment(this.zk);
-
-        activeLedgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
-        activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
-
+        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+        LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
+        ledgerManager = ledgerManagerFactory.newLedgerManager();
         syncThread = new SyncThread(conf);
-        ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager,
-                ledgerDirsManager);
+        ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, ledgerDirsManager);
         handles = new HandleFactoryImpl(ledgerStorage);
         // instantiate the journal
         journal = new Journal(conf, ledgerDirsManager);
@@ -910,8 +907,8 @@ public class Bookie extends Thread {
 
                 // close Ledger Manager
                 try {
-                    activeLedgerManager.close();
-                    activeLedgerManagerFactory.uninitialize();
+                    ledgerManager.close();
+                    ledgerManagerFactory.uninitialize();
                 } catch (IOException ie) {
                     LOG.error("Failed to close active ledger manager : ", ie);
                 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+/**
+ * This is the garbage collector interface, garbage collector implementers
+ * need to extends this class to remove the deleted ledgers.
+ */
+public interface GarbageCollector {
+    /**
+     * Do the garbage collector work
+     *
+     * @param garbageCleaner
+     *          cleaner used to clean selected garbages
+     */
+    public abstract void gc(GarbageCleaner garbageCleaner);
+
+    /**
+     * A interface used to define customised garbage cleaner
+     */
+    public interface GarbageCleaner {
+
+        /**
+         * Clean a specific ledger
+         *
+         * @param ledgerId
+         *          Ledger ID to be cleaned
+         */
+        public void clean(final long ledgerId) ;
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java Mon Dec 24 04:50:02 2012
@@ -32,9 +32,11 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SnapshotMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,8 +73,7 @@ public class GarbageCollectorThread exte
 
     // Ledger Cache Handle
     final LedgerCache ledgerCache;
-
-    final ActiveLedgerManager activeLedgerManager;
+    final SnapshotMap<Long, Boolean> activeLedgers;
 
     // flag to ensure gc thread will not be interrupted during compaction
     // to reduce the risk getting entry log corrupted
@@ -83,6 +84,9 @@ public class GarbageCollectorThread exte
     // track the last scanned successfully log id
     long scannedLogId = 0;
 
+    final GarbageCollector garbageCollector;
+    final GarbageCleaner garbageCleaner;
+
     /**
      * A scanner wrapper to check whether a ledger is alive in an entry log file
      */
@@ -114,19 +118,37 @@ public class GarbageCollectorThread exte
      * @throws IOException
      */
     public GarbageCollectorThread(ServerConfiguration conf,
-                                  LedgerCache ledgerCache,
+                                  final LedgerCache ledgerCache,
                                   EntryLogger entryLogger,
-                                  ActiveLedgerManager activeLedgerManager,
-                                  EntryLogScanner scanner)
+                                  SnapshotMap<Long, Boolean> activeLedgers,
+                                  EntryLogScanner scanner,
+                                  LedgerManager ledgerManager)
         throws IOException {
         super("GarbageCollectorThread");
 
         this.ledgerCache = ledgerCache;
         this.entryLogger = entryLogger;
-        this.activeLedgerManager = activeLedgerManager;
+        this.activeLedgers = activeLedgers;
         this.scanner = scanner;
 
         this.gcWaitTime = conf.getGcWaitTime();
+
+        this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
+            @Override
+            public void clean(long ledgerId) {
+                try {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("delete ledger : " + ledgerId);
+                    }
+                    ledgerCache.deleteLedger(ledgerId);
+                } catch (IOException e) {
+                    LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+                }
+            }
+        };
+
+        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
+
         // compaction parameters
         minorCompactionThreshold = conf.getMinorCompactionThreshold();
         minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
@@ -223,17 +245,7 @@ public class GarbageCollectorThread exte
      * Do garbage collection ledger index files
      */
     private void doGcLedgers() {
-        activeLedgerManager.garbageCollectLedgers(
-        new ActiveLedgerManager.GarbageCollector() {
-            @Override
-            public void gc(long ledgerId) {
-                try {
-                    ledgerCache.deleteLedger(ledgerId);
-                } catch (IOException e) {
-                    LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
-                }
-            }
-        });
+        garbageCollector.gc(garbageCleaner);
     }
 
     /**
@@ -245,7 +257,7 @@ public class GarbageCollectorThread exte
             EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
             for (Long entryLogLedger : meta.ledgersMap.keySet()) {
                 // Remove the entry log ledger from the set if it isn't active.
-                if (!activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+                if (!activeLedgers.containsKey(entryLogLedger)) {
                     meta.removeLedger(entryLogLedger);
                 }
             }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java Mon Dec 24 04:50:02 2012
@@ -26,8 +26,10 @@ import java.io.IOException;
 
 import org.apache.bookkeeper.jmx.BKMBeanInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.zookeeper.ZooKeeper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,10 @@ class InterleavedLedgerStorage implement
 
     EntryLogger entryLogger;
     LedgerCache ledgerCache;
+
+    // A sorted map to stored all active ledger ids
+    protected final SnapshotMap<Long, Boolean> activeLedgers;
+
     // This is the thread that garbage collects the entry logs that do not
     // contain any active ledgers in them; and compacts the entry logs that
     // has lower remaining percentage to reclaim disk space.
@@ -51,15 +57,16 @@ class InterleavedLedgerStorage implement
     private volatile boolean somethingWritten = false;
 
     InterleavedLedgerStorage(ServerConfiguration conf,
-            ActiveLedgerManager activeLedgerManager,
-            LedgerDirsManager ledgerDirsManager) throws IOException {
+            LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager)
+			throws IOException {
+        activeLedgers = new SnapshotMap<Long, Boolean>();
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
-        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager, ledgerDirsManager);
+        ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
-                activeLedgerManager, new EntryLogCompactionScanner());
+                activeLedgers, new EntryLogCompactionScanner(), ledgerManager);
     }
 
-    @Override    
+    @Override
     public void start() {
         gcThread.start();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java Mon Dec 24 04:50:02 2012
@@ -35,7 +35,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -54,7 +54,8 @@ public class LedgerCacheImpl implements 
     private LedgerDirsManager ledgerDirsManager;
     final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
 
-    public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, LedgerDirsManager ledgerDirsManager)
+    public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
+            LedgerDirsManager ledgerDirsManager)
             throws IOException {
         this.ledgerDirsManager = ledgerDirsManager;
         this.openFileLimit = conf.getOpenFileLimit();
@@ -69,7 +70,7 @@ public class LedgerCacheImpl implements 
         }
         LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
         LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
-        activeLedgerManager = alm;
+        this.activeLedgers = activeLedgers;
         // Retrieve all of the active ledgers.
         getActiveLedgers();
         ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
@@ -90,7 +91,7 @@ public class LedgerCacheImpl implements 
 
     // Manage all active ledgers in LedgerManager
     // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
-    final ActiveLedgerManager activeLedgerManager;
+    final SnapshotMap<Long, Boolean> activeLedgers;
 
     final int openFileLimit;
     final int pageSize;
@@ -257,7 +258,7 @@ public class LedgerCacheImpl implements 
                     // A new ledger index file has been created for this Bookie.
                     // Add this new ledger to the set of active ledgers.
                     LOG.debug("New ledger index file created for ledgerId: {}", ledger);
-                    activeLedgerManager.addActiveLedger(ledger, true);
+                    activeLedgers.put(ledger, true);
                 }
                 evictFileInfoIfNecessary();
                 fi = new FileInfo(lf, masterKey);
@@ -697,7 +698,7 @@ public class LedgerCacheImpl implements 
                                         }
                                     }
                                 }
-                                activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
+                                activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
                             }
                         }
                     }
@@ -739,7 +740,7 @@ public class LedgerCacheImpl implements 
         }
 
         // Remove it from the active ledger manager
-        activeLedgerManager.removeActiveLedger(ledgerId);
+        activeLedgers.remove(ledgerId);
 
         // Now remove it from all the other lists and maps.
         // These data structures need to be synchronized first before removing entries.

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Garbage collector implementation using scan and compare.
+ *
+ * <p>
+ * Garbage collection is processed as below:
+ * <ul>
+ * <li> fetch all existing ledgers from zookeeper or metastore according to
+ * the LedgerManager, called <b>globalActiveLedgers</b>
+ * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
+ * <li> loop over <b>bkActiveLedgers</b> to find those ledgers that are not in
+ * <b>globalActiveLedgers</b>, do garbage collection on them.
+ * </ul>
+ * </p>
+ */
+public class ScanAndCompareGarbageCollector implements GarbageCollector{
+
+    static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
+    private SnapshotMap<Long, Boolean> activeLedgers;
+    private LedgerManager ledgerManager;
+
+    public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, SnapshotMap<Long, Boolean> activeLedgers) {
+        this.ledgerManager = ledgerManager;
+        this.activeLedgers = activeLedgers;
+    }
+
+    @Override
+    public void gc(GarbageCleaner garbageCleaner) {
+        // create a snapshot first
+        NavigableMap<Long, Boolean> bkActiveLedgersSnapshot =
+                this.activeLedgers.snapshot();
+        LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges();
+        try {
+            // Empty global active ledgers, need to remove all local active ledgers.
+            if (!ledgerRangeIterator.hasNext()) {
+                for (Long bkLid : bkActiveLedgersSnapshot.keySet()) {
+                    // remove it from current active ledger
+                    bkActiveLedgersSnapshot.remove(bkLid);
+                    garbageCleaner.clean(bkLid);
+                }
+            }
+            while(ledgerRangeIterator.hasNext()) {
+                LedgerRange lRange = ledgerRangeIterator.next();
+                Map<Long, Boolean> subBkActiveLedgers = null;
+                Long start = lRange.start();
+                Long end = lRange.end();
+                if (end != LedgerRange.NOLIMIT) {
+                    subBkActiveLedgers = bkActiveLedgersSnapshot.subMap(start,
+                            true, end, true);
+                } else {
+                    if (start != LedgerRange.NOLIMIT) {
+                        subBkActiveLedgers = bkActiveLedgersSnapshot.tailMap(start);
+                    } else {
+                        subBkActiveLedgers = bkActiveLedgersSnapshot;
+                    }
+                }
+                Set<Long> globalActiveLedgers = lRange.getLedgers();
+                LOG.debug("All active ledgers for hash node {}, Current active ledgers from Bookie for hash node {}",
+                        globalActiveLedgers, subBkActiveLedgers.keySet());
+                for (Long bkLid : subBkActiveLedgers.keySet()) {
+                    if (!globalActiveLedgers.contains(bkLid)) {
+                        // remove it from current active ledger
+                        subBkActiveLedgers.remove(bkLid);
+                        garbageCleaner.clean(bkLid);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            // ignore exception, collecting garbage next time
+            LOG.warn("Exception when iterating over the metadata {}", e);
+        }
+    }
+}
+
+

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java Mon Dec 24 04:50:02 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.client;
 
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
+import org.apache.bookkeeper.versioning.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@ class LedgerDeleteOp extends OrderedSafe
     public void initiate() {
         // Asynchronously delete the ledger from meta manager
         // When this completes, it will invoke the callback method below.
-        bk.getLedgerManager().deleteLedger(ledgerId, this);
+        bk.getLedgerManager().removeLedgerMetadata(ledgerId, Version.ANY, this);
     }
 
     /**

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.Map;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -32,6 +31,7 @@ import org.apache.bookkeeper.proto.Bookk
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,7 @@ import org.apache.zookeeper.data.Stat;
 /**
  * Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
  */
-abstract class AbstractZkLedgerManager implements LedgerManager, ActiveLedgerManager {
+abstract class AbstractZkLedgerManager implements LedgerManager {
 
     static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
@@ -55,9 +55,6 @@ abstract class AbstractZkLedgerManager i
     protected final ZooKeeper zk;
     protected final String ledgerRootPath;
 
-    // A sorted map to stored all active ledger ids
-    protected final SnapshotMap<Long, Boolean> activeLedgers;
-
     /**
      * ZooKeeper-based Ledger Manager Constructor
      *
@@ -70,8 +67,6 @@ abstract class AbstractZkLedgerManager i
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = conf.getZkLedgersRootPath();
-
-        activeLedgers = new SnapshotMap<Long, Boolean>();
     }
 
     /**
@@ -93,9 +88,32 @@ abstract class AbstractZkLedgerManager i
      */
     protected abstract long getLedgerId(String ledgerPath) throws IOException;
 
+    /**
+     * Removes ledger metadata from ZooKeeper if version matches.
+     *
+     * @param   ledgerId    ledger identifier
+     * @param   version     local version of metadata znode
+     * @param   cb          callback object
+     */
     @Override
-    public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
-        zk.delete(getLedgerPath(ledgerId), -1, new VoidCallback() {
+    public void removeLedgerMetadata(final long ledgerId, final Version version,
+            final GenericCallback<Void> cb) {
+        int znodeVersion = -1;
+        if (Version.NEW == version) {
+            LOG.error("Request to delete ledger {} metadata with version set to the initial one", ledgerId);
+            cb.operationComplete(BKException.Code.MetadataVersionException, (Void)null);
+            return;
+        } else if (Version.ANY != version) {
+            if (!(version instanceof ZkVersion)) {
+                LOG.info("Not an instance of ZKVersion: {}", ledgerId);
+                cb.operationComplete(BKException.Code.MetadataVersionException, (Void)null);
+                return;
+            } else {
+                znodeVersion = ((ZkVersion)version).getZnodeVersion();
+            }
+        }
+
+        zk.delete(getLedgerPath(ledgerId), znodeVersion, new VoidCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx) {
                 int bkRc;
@@ -174,112 +192,6 @@ abstract class AbstractZkLedgerManager i
     }
 
     /**
-     * Get all the ledgers in a single zk node
-     *
-     * @param nodePath
-     *          Zookeeper node path
-     * @param getLedgersCallback
-     *          callback function to process ledgers in a single node
-     */
-    protected void asyncGetLedgersInSingleNode(final String nodePath, final GenericCallback<HashSet<Long>> getLedgersCallback) {
-        // First sync ZK to make sure we're reading the latest active/available ledger nodes.
-        zk.sync(nodePath, new AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                LOG.debug("Sync node path {} return : {}", path, rc);
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("ZK error syncing the ledgers node when getting children: ", KeeperException
-                            .create(KeeperException.Code.get(rc), path));
-                    getLedgersCallback.operationComplete(rc, null);
-                    return;
-                }
-                // Sync has completed successfully so now we can poll ZK
-                // and read in the latest set of active ledger nodes.
-                doAsyncGetLedgersInSingleNode(nodePath, getLedgersCallback);
-            }
-        }, null);
-    }
-
-    private void doAsyncGetLedgersInSingleNode(final String nodePath,
-                                               final GenericCallback<HashSet<Long>> getLedgersCallback) {
-        zk.getChildren(nodePath, false, new AsyncCallback.ChildrenCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx, List<String> ledgerNodes) {
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("Error polling ZK for the available ledger nodes: ", KeeperException
-                            .create(KeeperException.Code.get(rc), path));
-                    getLedgersCallback.operationComplete(rc, null);
-                    return;
-                }
-                LOG.debug("Retrieved current set of ledger nodes: {}", ledgerNodes);
-                // Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
-                HashSet<Long> allActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
-                for (String ledgerNode : ledgerNodes) {
-                    if (isSpecialZnode(ledgerNode)) {
-                        continue;
-                    }
-                    try {
-                        // convert the node path to ledger id according to different ledger manager implementation
-                        allActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
-                    } catch (IOException ie) {
-                        LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
-                        // This is a pretty bad error as it indicates a ledger node in ZK
-                        // has an incorrect format. For now just continue and consider
-                        // this as a non-existent ledger.
-                        continue;
-                    }
-                }
-
-                getLedgersCallback.operationComplete(rc, allActiveLedgers);
-
-            }
-        }, null);
-    }
-
-    private static class GetLedgersCtx {
-        int rc;
-        boolean done = false;
-        HashSet<Long> ledgers = null;
-    }
-
-    /**
-     * Get all the ledgers in a single zk node
-     *
-     * @param nodePath
-     *          Zookeeper node path
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected HashSet<Long> getLedgersInSingleNode(final String nodePath)
-        throws IOException, InterruptedException {
-        final GetLedgersCtx ctx = new GetLedgersCtx();
-        LOG.debug("Try to get ledgers of node : {}", nodePath);
-        asyncGetLedgersInSingleNode(nodePath, new GenericCallback<HashSet<Long>>() {
-                @Override
-                public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
-                    synchronized (ctx) {
-                        if (Code.OK.intValue() == rc) {
-                            ctx.ledgers = zkActiveLedgers;
-                        }
-                        ctx.rc = rc;
-                        ctx.done = true;
-                        ctx.notifyAll();
-                    }
-                }
-            });
-
-        synchronized (ctx) {
-            while (ctx.done == false) {
-                ctx.wait();
-            }
-        }
-        if (Code.OK.intValue() != ctx.rc) {
-            throw new IOException("Error on getting ledgers from node " + nodePath);
-        }
-        return ctx.ledgers;
-    }
-
-    /**
      * Process ledgers in a single zk node.
      *
      * <p>
@@ -309,14 +221,15 @@ abstract class AbstractZkLedgerManager i
             final String path, final Processor<Long> processor,
             final AsyncCallback.VoidCallback finalCb, final Object ctx,
             final int successRc, final int failureRc) {
-        asyncGetLedgersInSingleNode(path, new GenericCallback<HashSet<Long>>() {
+        ZkUtils.getChildrenInSingleNode(zk, path, new GenericCallback<List<String>>() {
             @Override
-            public void operationComplete(int rc, HashSet<Long> zkActiveLedgers) {
+            public void operationComplete(int rc, List<String> ledgerNodes) {
                 if (Code.OK.intValue() != rc) {
                     finalCb.processResult(failureRc, null, ctx);
                     return;
                 }
 
+                Set<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, path);
                 LOG.debug("Processing ledgers: {}", zkActiveLedgers);
 
                 // no ledgers found, return directly
@@ -353,43 +266,36 @@ abstract class AbstractZkLedgerManager i
         return false;
     }
 
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public void addActiveLedger(long ledgerId, boolean active) {
-        activeLedgers.put(ledgerId, active);
-    }
-
-    @Override
-    public void removeActiveLedger(long ledgerId) {
-        activeLedgers.remove(ledgerId);
-    }
-
-    @Override
-    public boolean containsActiveLedger(long ledgerId) {
-        return activeLedgers.containsKey(ledgerId);
-    }
-
     /**
-     * Do garbage collecting comparing hosted ledgers and zk ledgers
+     * Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
      *
-     * @param gc
-     *          Garbage collector to do garbage collection when found inactive/deleted ledgers
-     * @param bkActiveLedgers
-     *          Active ledgers hosted in bookie server
-     * @param zkAllLedgers
-     *          All ledgers stored in zookeeper
+     * @param ledgerNodes
+     *          zk ledger nodes
+     * @param path
+     *          the prefix path of the ledger nodes
+     * @return ledger id hash set
      */
-    void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> zkAllLedgers) {
-        // remove any active ledgers that doesn't exist in zk
-        for (Long bkLid : bkActiveLedgers.keySet()) {
-            if (!zkAllLedgers.contains(bkLid)) {
-                // remove it from current active ledger
-                bkActiveLedgers.remove(bkLid);
-                gc.gc(bkLid);
+    protected Set<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
+        Set<Long> zkActiveLedgers = new HashSet<Long>(ledgerNodes.size(), 1.0f);
+        for (String ledgerNode : ledgerNodes) {
+            if (isSpecialZnode(ledgerNode)) {
+                continue;
+            }
+            try {
+                // convert the node path to ledger id according to different ledger manager implementation
+                zkActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
+            } catch (IOException e) {
+                LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
+                // This is a pretty bad error as it indicates a ledger node in ZK
+                // has an incorrect format. For now just continue and consider
+                // this as a non-existent ledger.
+                continue;
             }
         }
+        return zkActiveLedgers;
+    }
+
+    @Override
+    public void close() {
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -19,7 +19,7 @@ package org.apache.bookkeeper.meta;
  */
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import org.apache.bookkeeper.client.BKException;
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
@@ -47,18 +46,6 @@ import org.slf4j.LoggerFactory;
  * All ledgers' metadata are put in a single zk node, created using zk sequential node.
  * Each ledger node is prefixed with 'L'.
  * </p>
- * <p>
- * All actived ledgers found in bookie server side is managed in a hash map.
- * </p>
- * <p>
- * Garbage collection in FlatLedgerManager is procssed as below:
- * <ul>
- * <li> fetch all existed ledgers from zookeeper, said <b>zkActiveLedgers</b>
- * <li> fetch all active ledgers from bookie server, said <b>bkActiveLedgers</b>
- * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in
- * <b>zkActiveLedgers</b>, do garbage collection on them.
- * </ul>
- * </p>
  */
 class FlatLedgerManager extends AbstractZkLedgerManager {
 
@@ -80,8 +67,7 @@ class FlatLedgerManager extends Abstract
     public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
         super(conf, zk);
 
-        ledgerPrefix = ledgerRootPath + "/"
-                + BookKeeperConstants.LEDGER_NODE_PREFIX;
+        ledgerPrefix = ledgerRootPath + "/" + StringUtils.LEDGER_NODE_PREFIX;
     }
 
     @Override
@@ -139,24 +125,29 @@ class FlatLedgerManager extends Abstract
     }
 
     @Override
-    public void garbageCollectLedgers(GarbageCollector gc) {
-        if (null == zk) {
-            LOG.warn("Skip garbage collecting ledgers because there is no ZooKeeper handle.");
-            return;
-        }
-        try {
-            // create a snapshot first
-            Map<Long, Boolean> bkActiveLedgers = activeLedgers.snapshot();
-            Set<Long> zkActiveLedgers = getLedgersInSingleNode(ledgerRootPath);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("All active ledgers from ZK: {}. Current active ledgers from Bookie: {}.",
-                    zkActiveLedgers, bkActiveLedgers.keySet());
+    public LedgerRangeIterator getLedgerRanges() {
+        return new LedgerRangeIterator() {
+            // single iterator, can visit only one time
+            boolean hasMoreElement = true;
+            @Override
+            public boolean hasNext() {
+                return hasMoreElement;
             }
-            doGc(gc, bkActiveLedgers, zkActiveLedgers);
-        } catch (IOException ie) {
-            LOG.warn("Error during garbage collecting ledgers from " + ledgerRootPath, ie);
-        } catch (InterruptedException inte) {
-            LOG.warn("Interrupted during garbage collecting ledgers from " + ledgerRootPath, inte);
-        }
+            @Override
+            public LedgerRange next() throws IOException {
+                if (!hasMoreElement) {
+                    throw new NoSuchElementException();
+                }
+                hasMoreElement = false;
+                Set<Long> zkActiveLedgers;
+                try {
+                    zkActiveLedgers = ledgerListToSet(
+                            ZkUtils.getChildrenInSingleNode(zk, ledgerRootPath), ledgerRootPath);
+                } catch (InterruptedException e) {
+                    throw new IOException("Error when get child nodes from zk", e);
+                }
+                return new LedgerRange(zkActiveLedgers);
+            }
+        };
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -69,11 +69,6 @@ class FlatLedgerManagerFactory extends L
     }
 
     @Override
-    public ActiveLedgerManager newActiveLedgerManager() {
-        return new FlatLedgerManager(conf, zk);
-    }
-
-    @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
         return new ZkLedgerUnderreplicationManager(conf, zk);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java Mon Dec 24 04:50:02 2012
@@ -22,16 +22,15 @@ import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Set;
-import java.util.Map;
-import java.util.NavigableMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
 
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.AsyncCallback;
@@ -58,25 +57,7 @@ import org.slf4j.LoggerFactory;
  * <pre>(ledgersRootPath)/level1/level2/L(level3)</pre>
  * E.g Ledger 0000000001 is split into 3 parts <i>00</i>, <i>0000</i>, <i>0001</i>, which is stored in
  * <i>(ledgersRootPath)/00/0000/L0001</i>. So each znode could have at most 10000 ledgers, which avoids
- * failed to get children list of a too big znode during garbage collection.
- * <p>
- * All actived ledgers found in bookie server is managed in a sorted map, which ease us to pick
- * up all actived ledgers belongs to (level1, level2).
- * </p>
- * <p>
- * Garbage collection in HierarchicalLedgerManager is processed node by node as below:
- * <ul>
- * fetching all level1 nodes, by calling zk#getChildren(ledgerRootPath).
- * <ul>
- * for each level1 node, fetching their level2 nodes, by calling zk#getChildren(ledgerRootPath + "/" + level1)
- * <li> fetch all existed ledgers from zookeeper in level1/level2 node, said <b>zkActiveLedgers</b>
- * <li> fetch all active ledgers from bookie server in level1/level2, said <b>bkActiveLedgers</b>
- * <li> loop over <b>bkActiveLedgers</b> to find those ledgers aren't existed in <b>zkActiveLedgers</b>, do garbage collection on them.
- * </ul>
- * </ul>
- * Since garbage collection is running in background, HierarchicalLedgerManager did gc on single hash
- * node at a time to avoid consuming too much resources.
- * </p>
+ * errors during garbage collection due to lists of children that are too long.
  */
 class HierarchicalLedgerManager extends AbstractZkLedgerManager {
 
@@ -196,15 +177,7 @@ class HierarchicalLedgerManager extends 
 
     @Override
     public String getLedgerPath(long ledgerId) {
-        String ledgerIdStr = StringUtils.getZKStringId(ledgerId);
-        // do 2-4-4 split
-        StringBuilder sb = new StringBuilder();
-        sb.append(ledgerRootPath).append("/").append(
-                ledgerIdStr.substring(0, 2)).append("/").append(
-                ledgerIdStr.substring(2, 6)).append("/").append(
-                BookKeeperConstants.LEDGER_NODE_PREFIX).append(
-                ledgerIdStr.substring(6, 10));
-        return sb.toString();
+        return ledgerRootPath + StringUtils.getHierarchicalLedgerPath(ledgerId);
     }
 
     @Override
@@ -213,26 +186,12 @@ class HierarchicalLedgerManager extends 
             throw new IOException("it is not a valid hashed path name : " + pathName);
         }
         String hierarchicalPath = pathName.substring(ledgerRootPath.length() + 1);
-        String[] hierarchicalParts = hierarchicalPath.split("/");
-        if (hierarchicalParts.length != 3) {
-            throw new IOException("it is not a valid hierarchical path name : " + pathName);
-        }
-        hierarchicalParts[2] = hierarchicalParts[2]
-                .substring(BookKeeperConstants.LEDGER_NODE_PREFIX.length());
-        return getLedgerId(hierarchicalParts);
+        return StringUtils.stringToHierarchicalLedgerId(hierarchicalPath);
     }
 
     // get ledger from all level nodes
     private long getLedgerId(String...levelNodes) throws IOException {
-        try {
-            StringBuilder sb = new StringBuilder();
-            for (String node : levelNodes) {
-                sb.append(node);
-            }
-            return Long.parseLong(sb.toString());
-        } catch (NumberFormatException e) {
-            throw new IOException(e);
-        }
+        return StringUtils.stringToHierarchicalLedgerId(levelNodes);
     }
 
     //
@@ -333,71 +292,6 @@ class HierarchicalLedgerManager extends 
         }, null);
     }
 
-    @Override
-    public void garbageCollectLedgers(GarbageCollector gc) {
-        if (null == zk) {
-            LOG.warn("Skip garbage collecting ledgers because there is no ZooKeeper handle.");
-            return;
-        }
-        // create a snapshot before garbage collection
-        NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
-        try {
-            List<String> l1Nodes = zk.getChildren(ledgerRootPath, null);
-            for (String l1Node : l1Nodes) {
-                if (isSpecialZnode(l1Node)) {
-                    continue;
-                }
-                try {
-                    List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + l1Node, null);
-                    for (String l2Node : l2Nodes) {
-                        doGcByLevel(gc, l1Node, l2Node, snapshot);
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Exception during garbage collecting ledgers for " + l1Node
-                             + " of " + ledgerRootPath, e);
-                }
-            }
-        } catch (Exception e) {
-            LOG.warn("Exception during garbage collecting inactive/deleted ledgers", e);
-        }
-    }
-
-    /**
-     * Garbage collection a single node level1/level2
-     *
-     * @param gc
-     *          Garbage collector
-     * @param level1
-     *          1st level node name
-     * @param level2
-     *          2nd level node name
-     * @param snapshot
-     *          Snapshot of the active ledgers map.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void doGcByLevel(GarbageCollector gc, final String level1, final String level2,
-                     NavigableMap snapshot)
-        throws IOException, InterruptedException {
-
-        StringBuilder nodeBuilder = new StringBuilder();
-        nodeBuilder.append(ledgerRootPath).append("/")
-                   .append(level1).append("/").append(level2);
-        String nodePath = nodeBuilder.toString();
-
-        Set<Long> zkActiveLedgers = getLedgersInSingleNode(nodePath);
-        // get hosted ledgers in /level1/level2
-        long startLedgerId = getStartLedgerIdByLevel(level1, level2);
-        long endLedgerId = getEndLedgerIdByLevel(level1, level2);
-        Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("For hash node: " + level1 + "/" + level2 + ": All active ledgers from ZK: "
-                      + zkActiveLedgers + ". Current active ledgers from Bookie: "+ bkActiveLedgers);
-        }
-
-        doGc(gc, bkActiveLedgers, zkActiveLedgers);
-    }
-
     /**
      * Process list one by one in asynchronize way. Process will be stopped immediately
      * when error occurred.
@@ -474,4 +368,99 @@ class HierarchicalLedgerManager extends 
     protected boolean isSpecialZnode(String znode) {
         return IDGEN_ZNODE.equals(znode) || super.isSpecialZnode(znode);
     }
+
+    @Override
+    public LedgerRangeIterator getLedgerRanges() {
+        return new HierarchicalLedgerRangeIterator();
+    }
+
+    /**
+     * Iterator through each metadata bucket with hierarchical mode
+     */
+    private class HierarchicalLedgerRangeIterator implements LedgerRangeIterator {
+        private Iterator<String> l1NodesIter = null;
+        private Iterator<String> l2NodesIter = null;
+        private String curL1Nodes = "";
+        private boolean hasMoreElement = true;
+
+        /**
+         * iterate next level1 znode
+         *
+         * @return false if have visited all level1 nodes
+         * @throws InterruptedException/KeeperException if error occurs reading zookeeper children
+         */
+        private boolean nextL1Node() throws KeeperException, InterruptedException {
+            l2NodesIter = null;
+            while (l2NodesIter == null) {
+                if (l1NodesIter.hasNext()) {
+                    curL1Nodes = l1NodesIter.next();
+                } else {
+                    return false;
+                }
+                if (isSpecialZnode(curL1Nodes)) {
+                    continue;
+                }
+                List<String> l2Nodes = zk.getChildren(ledgerRootPath + "/" + curL1Nodes, null);
+                l2NodesIter = l2Nodes.iterator();
+                if (!l2NodesIter.hasNext()) {
+                    l2NodesIter = null;
+                    continue;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public boolean hasNext() throws IOException {
+            try {
+                if (l1NodesIter == null) {
+                    l1NodesIter = zk.getChildren(ledgerRootPath, null).iterator();
+                    hasMoreElement = nextL1Node();
+                } else if (!l2NodesIter.hasNext()) {
+                    hasMoreElement = nextL1Node();
+                }
+            } catch (Exception e) {
+                throw new IOException("Error when check more elements", e);
+            }
+            return hasMoreElement;
+        }
+
+        @Override
+        public LedgerRange next() throws IOException {
+            if (!hasMoreElement) {
+                throw new NoSuchElementException();
+            }
+            return getLedgerRangeByLevel(curL1Nodes, l2NodesIter.next());
+        }
+
+        /**
+         * Get a single node level1/level2
+         *
+         * @param level1
+         *          1st level node name
+         * @param level2
+         *          2nd level node name
+         * @throws IOException
+         */
+        LedgerRange getLedgerRangeByLevel(final String level1, final String level2)
+                throws IOException {
+            StringBuilder nodeBuilder = new StringBuilder();
+            nodeBuilder.append(ledgerRootPath).append("/")
+                       .append(level1).append("/").append(level2);
+            String nodePath = nodeBuilder.toString();
+            List<String> ledgerNodes = null;
+            try {
+                ledgerNodes = ZkUtils.getChildrenInSingleNode(zk, nodePath);
+            } catch (InterruptedException e) {
+                throw new IOException("Error when get child nodes from zk", e);
+            }
+            Set<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, nodePath);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("All active ledgers from ZK for hash node "
+                          + level1 + "/" + level2 + " : " + zkActiveLedgers);
+            }
+            return new LedgerRange(zkActiveLedgers,
+                    getStartLedgerIdByLevel(level1, level2), getEndLedgerIdByLevel(level1, level2));
+        }
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -69,11 +69,6 @@ class HierarchicalLedgerManagerFactory e
     }
 
     @Override
-    public ActiveLedgerManager newActiveLedgerManager() {
-        return new HierarchicalLedgerManager(conf, zk);
-    }
-
-    @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
         return new ZkLedgerUnderreplicationManager(conf, zk);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java Mon Dec 24 04:50:02 2012
@@ -19,11 +19,15 @@ package org.apache.bookkeeper.meta;
  */
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
 
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.versioning.Version;
 
 /**
  * LedgerManager takes responsibility of ledger management in client side.
@@ -41,18 +45,25 @@ public interface LedgerManager extends C
      *        Metadata provided when creating a new ledger
      * @param cb
      *        Callback when creating a new ledger.
+     *        {@link BKException.Code.ZKException} return code when can't generate
+     *        or extract new ledger id
      */
-    public abstract void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+    public void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
 
     /**
-     * Delete a specified ledger by ledgerId.
+     * Remove a specified ledger metadata by ledgerId and version.
      *
      * @param ledgerId
      *          Ledger Id
+     * @param version
+     *          Ledger metadata version
      * @param cb
-     *          Callback when deleted ledger.
+     *          Callback when removed ledger metadata.
+     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
+     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
+     *          {@link BKException.Code.ZKException} return code when other issues happen.
      */
-    public abstract void deleteLedger(long ledgerId, GenericCallback<Void> cb);
+    public void removeLedgerMetadata(long ledgerId, Version version, GenericCallback<Void> vb);
 
     /**
      * Read ledger metadata of a specified ledger.
@@ -61,8 +72,10 @@ public interface LedgerManager extends C
      *          Ledger Id
      * @param readCb
      *          Callback when read ledger metadata.
+     *          {@link BKException.Code.NoSuchLedgerExistsException} return code when ledger doesn't exist,
+     *          {@link BKException.Code.ZKException} return code when other issues happen.
      */
-    public abstract void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
+    public void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
 
     /**
      * Write ledger metadata.
@@ -73,8 +86,10 @@ public interface LedgerManager extends C
      *          Ledger Metadata to write
      * @param cb
      *          Callback when finished writing ledger metadata.
+     *          {@link BKException.Code.MetadataVersionException} return code when version doesn't match,
+     *          {@link BKException.Code.ZKException} return code when other issues happen.
      */
-    public abstract void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
+    public void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 
     /**
      * Loop to process all ledgers.
@@ -99,4 +114,79 @@ public interface LedgerManager extends C
      */
     public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
                                     Object context, int successRc, int failureRc);
+
+    /**
+     * Loop to scan a range of metadata from metadata storage
+     *
+     * @return will return a iterator of the Ranges
+     */
+    public LedgerRangeIterator getLedgerRanges();
+
+    /*
+     * Used to represent the Ledgers range returned from the
+     * current scan.
+     */
+    public static class LedgerRange {
+        // ledger start and end ranges
+        private final long start;
+        private final long end;
+        public final static long NOLIMIT = -1;
+
+        // returned ledgers
+        private Set<Long> ledgers;
+
+        public LedgerRange(Set<Long> ledgers) {
+            this(ledgers, NOLIMIT, NOLIMIT);
+        }
+
+        public LedgerRange(Set<Long> ledgers, long start) {
+            this(ledgers, start, NOLIMIT);
+        }
+
+        public LedgerRange(Set<Long> ledgers, long start, long end) {
+            this.ledgers = ledgers;
+            this.start = start;
+            this.end = end;
+        }
+
+        public Long start() {
+            return this.start;
+        }
+
+        public Long end() {
+            return this.end;
+        }
+
+        public Set<Long> getLedgers() {
+            return this.ledgers;
+        }
+    }
+
+    /**
+     * Interface of the ledger meta range iterator from
+     * storage (e.g. in ZooKeeper or other key/value store)
+     */
+    interface LedgerRangeIterator {
+
+        /**
+         * @return true if there are records in the ledger metadata store. false
+         * only when there are indeed no records in ledger metadata store.
+         * @throws IOException thrown when there is any problem accessing the ledger
+         * metadata store. It is critical that it doesn't return false in the case
+         * in the case it fails to access the ledger metadata store. Otherwise it
+         * will end up deleting all ledgers by accident.
+         */
+        public boolean hasNext() throws IOException;
+
+        /**
+         * Get the next element.
+         *
+         * @return the next element.
+         * @throws IOException thrown when there is a problem accessing the ledger
+         * metadata store. It is critical that it doesn't return false in the case
+         * in the case it fails to access the ledger metadata store. Otherwise it
+         * will end up deleting all ledgers by accident.
+         */
+        public LedgerRange next() throws IOException;
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -76,15 +76,7 @@ public abstract class LedgerManagerFacto
     public abstract LedgerManager newLedgerManager();
 
     /**
-     * return active ledger manager for server side to manage active ledgers.
-     *
-     * @return active ledger manager
-     * @see ActiveLedgerManager
-     */
-    public abstract ActiveLedgerManager newActiveLedgerManager();
-
-    /**
-     * Return a ledger underreplication manager, which is used to 
+     * Return a ledger underreplication manager, which is used to
      * mark ledgers as unreplicated, and to retrieve a ledger which
      * is underreplicated so that it can be rereplicated.
      *

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java Mon Dec 24 04:50:02 2012
@@ -17,20 +17,17 @@
  */
 package org.apache.bookkeeper.meta;
 
-import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_END_KEY;
 import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
 import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -43,7 +40,6 @@ import org.apache.bookkeeper.metastore.M
 import org.apache.bookkeeper.metastore.MetastoreException;
 import org.apache.bookkeeper.metastore.MetastoreFactory;
 import org.apache.bookkeeper.metastore.MetastoreScannableTable;
-import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
 import org.apache.bookkeeper.metastore.MetastoreTableItem;
 import org.apache.bookkeeper.metastore.Value;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -171,8 +167,7 @@ public class MSLedgerManagerFactory exte
         }
     }
 
-    static abstract class AbstractMsLedgerManager implements Closeable {
-
+    static class MsLedgerManager implements LedgerManager {
         final ZooKeeper zk;
         final AbstractConfiguration conf;
 
@@ -180,7 +175,17 @@ public class MSLedgerManagerFactory exte
         final MetastoreScannableTable ledgerTable;
         final int maxEntriesPerScan;
 
-        AbstractMsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+        static final String IDGEN_ZNODE = "ms-idgen";
+        static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
+
+        // Path to generate global id
+        private final String idGenPath;
+
+        // we use this to prevent long stack chains from building up in
+        // callbacks
+        ScheduledExecutorService scheduler;
+
+        MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
             this.conf = conf;
             this.zk = zk;
             this.metastore = metastore;
@@ -194,29 +199,7 @@ public class MSLedgerManagerFactory exte
             }
             // configuration settings
             maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
-        }
-
-        @Override
-        public void close() {
-            ledgerTable.close();
-        }
-
-    }
 
-    static class MsLedgerManager extends AbstractMsLedgerManager implements LedgerManager {
-
-        static final String IDGEN_ZNODE = "ms-idgen";
-        static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
-
-        // Path to generate global id
-        private final String idGenPath;
-
-        // we use this to prevent long stack chains from building up in
-        // callbacks
-        ScheduledExecutorService scheduler;
-
-        MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
-            super(conf, zk, metastore);
             this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
             this.scheduler = Executors.newSingleThreadScheduledExecutor();
         }
@@ -228,7 +211,7 @@ public class MSLedgerManagerFactory exte
             } catch (Exception e) {
                 LOG.warn("Error when closing MsLedgerManager : ", e);
             }
-            super.close();
+            ledgerTable.close();
         }
 
         @Override
@@ -305,7 +288,8 @@ public class MSLedgerManagerFactory exte
         }
 
         @Override
-        public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+        public void removeLedgerMetadata(final long ledgerId, final Version version,
+                                         final GenericCallback<Void> cb) {
             MetastoreCallback<Void> msCallback = new MetastoreCallback<Void>() {
                 @Override
                 public void complete(int rc, Void value, Object ctx) {
@@ -321,7 +305,7 @@ public class MSLedgerManagerFactory exte
                     cb.operationComplete(bkRc, (Void) null);
                 }
             };
-            ledgerTable.remove(ledgerId2Key(ledgerId), Version.ANY, msCallback, null);
+            ledgerTable.remove(ledgerId2Key(ledgerId), version, msCallback, null);
         }
 
         @Override
@@ -475,129 +459,60 @@ public class MSLedgerManagerFactory exte
             };
             cursor.asyncReadEntries(maxEntriesPerScan, msCallback, null);
         }
-    }
-
-    static class MsActiveLedgerManager extends AbstractMsLedgerManager implements ActiveLedgerManager {
 
-        // A sorted map to stored all active ledger ids
-        protected final SnapshotMap<Long, Boolean> activeLedgers;
-
-        MsActiveLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
-            super(conf, zk, metastore);
-            activeLedgers = new SnapshotMap<Long, Boolean>();
-        }
-
-        @Override
-        public void addActiveLedger(long ledgerId, boolean active) {
-            activeLedgers.put(ledgerId, active);
-        }
-
-        @Override
-        public void removeActiveLedger(long ledgerId) {
-            activeLedgers.remove(ledgerId);
-        }
-
-        @Override
-        public boolean containsActiveLedger(long ledgerId) {
-            return activeLedgers.containsKey(ledgerId);
-        }
-
-        @Override
-        public void garbageCollectLedgers(GarbageCollector gc) {
-            LOG.debug("Start garbage collect ledgers.");
-            NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
-            Long nextLedger = 0L;
-            int numRetries = 3;
-            do {
-                nextLedger = doGcLedgers(nextLedger, snapshot, gc);
-            } while (null != nextLedger && --numRetries > 0);
-            LOG.debug("End garbage collect ledgers.");
-        }
-
-        /**
-         * Do garbage collection starting from <code>startLedger</code>.
-         *
-         * @param startLedgerId
-         *            Start Ledger id
-         * @param snapshot
-         *            Current snapshot of active ledgers
-         * @param gc
-         *            Garbage collector
-         * @return null if finished scanning all ledgers, the next ledger id to
-         *         scan
-         */
-        private Long doGcLedgers(Long startLedgerId, NavigableMap<Long, Boolean> snapshot, GarbageCollector gc) {
-            final SyncResult<MetastoreCursor> result = new SyncResult<MetastoreCursor>();
-            MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
-                @Override
-                public void complete(int rc, MetastoreCursor cursor, Object ctx) {
-                    result.complete(rc, cursor);
-                }
-            };
-            ledgerTable.openCursor(ledgerId2Key(startLedgerId), true, EMPTY_END_KEY, true, Order.ASC, NON_FIELDS,
-                    openCursorCb, null);
-            result.block();
-            if (MSException.Code.OK.getCode() != result.getRetCode()) {
-                LOG.warn("Failed to open metastore cursor to run garbage collection : ",
-                        MSException.create(MSException.Code.get(result.getRetCode())));
-                // failed to open a cursor, not continue until next gc
-                return null;
+        class MSLedgerRangeIterator implements LedgerRangeIterator {
+            final CountDownLatch openCursorLatch = new CountDownLatch(1);
+            MetastoreCursor cursor = null;
+
+            MSLedgerRangeIterator() {
+                MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+                    @Override
+                    public void complete(int rc, MetastoreCursor newCursor, Object ctx) {
+                        if (MSException.Code.OK.getCode() != rc) {
+                            LOG.error("Error opening cursor for ledger range iterator {}", rc);
+                        } else {
+                            cursor = newCursor;
+                        }
+                        openCursorLatch.countDown();
+                    }
+                };
+                ledgerTable.openCursor(NON_FIELDS, openCursorCb, null);
             }
 
-            MetastoreCursor cursor = result.getResult();
-
-            while (cursor.hasMoreEntries()) {
-                Iterator<MetastoreTableItem> entries;
+            @Override
+            public boolean hasNext() {
                 try {
-                    entries = cursor.readEntries(maxEntriesPerScan);
-                } catch (MSException mse) {
-                    LOG.warn("Exception when garbage collecting deleted ledgers : ", mse);
-                    return startLedgerId;
+                    openCursorLatch.await();
+                } catch (InterruptedException ie) {
+                    LOG.error("Interrupted waiting for cursor to open", ie);
+                    Thread.currentThread().interrupt();
+                    return false;
                 }
-
-                if (!entries.hasNext()) {
-                    continue;
+                if (cursor == null) {
+                    return false;
                 }
+                return cursor.hasMoreEntries();
+            }
 
-                SortedSet<Long> msActiveLedgers = entries2Ledgers(entries);
-
-                Long endLedgerId = msActiveLedgers.last();
-                Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("All active ledgers from Metastore between {} and {} : {}", new Object[] { startLedgerId,
-                            endLedgerId, msActiveLedgers });
-                    LOG.debug("Current active ledgers from Bookie between {} and {} : {}", new Object[] {
-                            startLedgerId, endLedgerId, bkActiveLedgers });
+            @Override
+            public LedgerRange next() throws IOException {
+                try {
+                    Set<Long> ledgerIds = new TreeSet<Long>();
+                    Iterator<MetastoreTableItem> iter = cursor.readEntries(maxEntriesPerScan);
+                    while (iter.hasNext()) {
+                        ledgerIds.add(key2LedgerId(iter.next().getKey()));
+                    }
+                    return new LedgerRange(ledgerIds);
+                } catch (MSException mse) {
+                    LOG.error("Exception occurred reading from metastore", mse);
+                    throw new IOException("Couldn't read from metastore", mse);
                 }
-                doGc(gc, bkActiveLedgers, msActiveLedgers);
-                // move the pointer
-                startLedgerId = endLedgerId + 1;
             }
-            doGc(gc, snapshot.tailMap(startLedgerId), new TreeSet<Long>());
-            return null;
         }
 
-        /**
-         * Do garbage collecting comparing hosted ledgers and metastore ledgers
-         *
-         * @param gc
-         *            Garbage collector to do garbage collection when found
-         *            inactive/deleted ledgers
-         * @param bkActiveLedgers
-         *            Active ledgers hosted in bookie server
-         * @param msAllLedgers
-         *            All ledgers stored in metastore
-         */
-        void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> msAllLedgers) {
-            // remove any active ledgers that doesn't exist in zk
-            for (Long bkLid : bkActiveLedgers.keySet()) {
-                if (!msAllLedgers.contains(bkLid)) {
-                    // remove it from current active ledger
-                    LOG.debug("gc ledger: {}", bkLid);
-                    bkActiveLedgers.remove(bkLid);
-                    gc.gc(bkLid);
-                }
-            }
+        @Override
+        public LedgerRangeIterator getLedgerRanges() {
+            return new MSLedgerRangeIterator();
         }
     }
 
@@ -607,11 +522,6 @@ public class MSLedgerManagerFactory exte
     }
 
     @Override
-    public ActiveLedgerManager newActiveLedgerManager() {
-        return new MsActiveLedgerManager(conf, zk, metastore);
-    }
-
-    @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
             InterruptedException, ReplicationException.CompatibilityException {
         // TODO: currently just use zk ledger underreplication manager

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java?rev=1425585&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/SnapshotMap.java Mon Dec 24 04:50:02 2012
@@ -0,0 +1,131 @@
+package org.apache.bookkeeper.util;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A snapshotable map.
+ */
+public class SnapshotMap<K, V> {
+    // stores recent updates
+    volatile Map<K, V> updates;
+    volatile Map<K, V> updatesToMerge;
+    // map stores all snapshot data
+    volatile NavigableMap<K, V> snapshot;
+
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public SnapshotMap() {
+        updates = new ConcurrentHashMap<K, V>();
+        updatesToMerge = new ConcurrentHashMap<K, V>();
+        snapshot = new ConcurrentSkipListMap<K, V>();
+    }
+
+    /**
+     * Create a snapshot of current map.
+     *
+     * @return a snapshot of current map.
+     */
+    public NavigableMap<K, V> snapshot() {
+        this.lock.writeLock().lock();
+        try {
+            if (updates.isEmpty()) {
+                return snapshot;
+            }
+            // put updates for merge to snapshot
+            updatesToMerge = updates;
+            updates = new ConcurrentHashMap<K, V>();
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+        // merging the updates to snapshot
+        for (Map.Entry<K, V> entry : updatesToMerge.entrySet()) {
+            snapshot.put(entry.getKey(), entry.getValue());
+        }
+        // clear updatesToMerge
+        this.lock.writeLock().lock();
+        try {
+            updatesToMerge = new ConcurrentHashMap<K, V>();
+        } finally {
+            this.lock.writeLock().unlock();
+        }
+        return snapshot;
+    }
+
+    /**
+     * Associates the specified value with the specified key in this map.
+     *
+     * @param key
+     *          Key with which the specified value is to be associated.
+     * @param value
+     *          Value to be associated with the specified key.
+     */
+    public void put(K key, V value) {
+        this.lock.readLock().lock();
+        try {
+            updates.put(key, value);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+
+    }
+
+    /**
+     * Removes the mapping for the key from this map if it is present.
+     *
+     * @param key
+     *          Key whose mapping is to be removed from this map.
+     */
+    public void remove(K key) {
+        this.lock.readLock().lock();
+        try {
+            // first remove updates
+            updates.remove(key);
+            updatesToMerge.remove(key);
+            // then remove snapshot
+            snapshot.remove(key);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Returns true if this map contains a mapping for the specified key.
+     *
+     * @param key
+     *          Key whose presence is in the map to be tested.
+     * @return true if the map contains a mapping for the specified key.
+     */
+    public boolean containsKey(K key) {
+        this.lock.readLock().lock();
+        try {
+            return updates.containsKey(key)
+                 | updatesToMerge.containsKey(key)
+                 | snapshot.containsKey(key);
+        } finally {
+            this.lock.readLock().unlock();
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java?rev=1425585&r1=1425584&r2=1425585&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java Mon Dec 24 04:50:02 2012
@@ -20,8 +20,6 @@ package org.apache.bookkeeper.util;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import org.apache.bookkeeper.conf.ServerConfiguration;
 
 /**
  * Provided utilites for parsing network addresses, ledger-id from node paths
@@ -30,6 +28,9 @@ import org.apache.bookkeeper.conf.Server
  */
 public class StringUtils {
 
+    // Ledger Node Prefix
+    static public final String LEDGER_NODE_PREFIX = "L";
+
     /**
      * Parses address into IP and port.
      *
@@ -68,4 +69,61 @@ public class StringUtils {
         return String.format("%010d", id);
     }
 
+    /**
+     * Get the hierarchical ledger path according to the ledger id
+     *
+     * @param ledgerId
+     *          ledger id
+     * @return the hierarchical path
+     */
+    public static String getHierarchicalLedgerPath(long ledgerId) {
+        String ledgerIdStr = getZKStringId(ledgerId);
+        // do 2-4-4 split
+        StringBuilder sb = new StringBuilder();
+        sb.append("/")
+          .append(ledgerIdStr.substring(0, 2)).append("/")
+          .append(ledgerIdStr.substring(2, 6)).append("/")
+          .append(LEDGER_NODE_PREFIX)
+          .append(ledgerIdStr.substring(6, 10));
+        return sb.toString();
+    }
+
+    /**
+     * Parse the hierarchical ledger path to its ledger id
+     *
+     * @param hierarchicalLedgerPath
+     * @return the ledger id
+     * @throws IOException
+     */
+    public static long stringToHierarchicalLedgerId(String hierarchicalLedgerPath)
+            throws IOException {
+        String[] hierarchicalParts = hierarchicalLedgerPath.split("/");
+        if (hierarchicalParts.length != 3) {
+            throw new IOException("it is not a valid hierarchical path name : " + hierarchicalLedgerPath);
+        }
+        hierarchicalParts[2] =
+            hierarchicalParts[2].substring(LEDGER_NODE_PREFIX.length());
+        return stringToHierarchicalLedgerId(hierarchicalParts);
+    }
+
+    /**
+     * Get ledger id
+     *
+     * @param levelNodes
+     *          level of the ledger path
+     * @return ledger id
+     * @throws IOException
+     */
+    public static long stringToHierarchicalLedgerId(String...levelNodes) throws IOException {
+        try {
+            StringBuilder sb = new StringBuilder();
+            for (String node : levelNodes) {
+                sb.append(node);
+            }
+            return Long.parseLong(sb.toString());
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+    }
+
 }