You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/12 11:20:41 UTC

svn commit: r1420607 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ bookkeeper-s...

Author: ivank
Date: Wed Dec 12 10:20:38 2012
New Revision: 1420607

URL: http://svn.apache.org/viewvc?rev=1420607&view=rev
Log:
BOOKKEEPER-205: implement a MetaStore based ledger manager for bookkeeper client. (jiannan via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Dec 12 10:20:38 2012
@@ -238,6 +238,8 @@ Trunk (unreleased changes)
 
 	BOOKKEEPER-469: Remove System.out.println from TestLedgerManager (ivank via fpj)
 
+        BOOKKEEPER-205: implement a MetaStore based ledger manager for bookkeeper client. (jiannan via ivank)
+
       hedwig-server:
 
         BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java Wed Dec 12 10:20:38 2012
@@ -62,6 +62,8 @@ public abstract class BKException extend
             return new BKBookieHandleNotAvailableException();
         case Code.ZKException:
             return new ZKException();
+        case Code.MetaStoreException:
+            return new MetaStoreException();
         case Code.LedgerRecoveryException:
             return new BKLedgerRecoveryException();
         case Code.LedgerClosedException:
@@ -114,6 +116,7 @@ public abstract class BKException extend
         int InterruptedException = -15;
         int ProtocolVersionException = -16;
         int MetadataVersionException = -17;
+        int MetaStoreException = -18;
 
         int IllegalOpException = -100;
         int LedgerFencedException = -101;
@@ -152,6 +155,8 @@ public abstract class BKException extend
             return "Bookie handle is not available";
         case Code.ZKException:
             return "Error while using ZooKeeper";
+        case Code.MetaStoreException:
+            return "Error while using MetaStore";
         case Code.LedgerRecoveryException:
             return "Error while recovering ledger";
         case Code.LedgerClosedException:
@@ -265,6 +270,12 @@ public abstract class BKException extend
         }
     }
 
+    public static class MetaStoreException extends BKException {
+        public MetaStoreException() {
+            super(Code.MetaStoreException);
+        }
+    }
+
     public static class BKLedgerRecoveryException extends BKException {
         public BKLedgerRecoveryException() {
             super(Code.LedgerRecoveryException);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java Wed Dec 12 10:20:38 2012
@@ -53,6 +53,10 @@ public abstract class AbstractConfigurat
     protected final static String AVAILABLE_NODE = "available";
     protected final static String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
 
+    // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
+    protected final static String METASTORE_IMPL_CLASS = "metastoreImplClass";
+    protected final static String METASTORE_MAX_ENTRIES_PER_SCAN = "metastoreMaxEntriesPerScan";
+
     protected AbstractConfiguration() {
         super();
         // add configuration for system properties
@@ -191,4 +195,42 @@ public abstract class AbstractConfigurat
     public long getRereplicationEntryBatchSize() {
         return getLong(REREPLICATION_ENTRY_BATCH_SIZE, 10);
     }
+
+    /**
+     * Get metastore implementation class.
+     *
+     * @return metastore implementation class name.
+     */
+    public String getMetastoreImplClass() {
+        return getString(METASTORE_IMPL_CLASS);
+    }
+
+    /**
+     * Set metastore implementation class.
+     *
+     * @param metastoreImplClass
+     *          Metastore implementation Class name.
+     */
+    public void setMetastoreImplClass(String metastoreImplClass) {
+        setProperty(METASTORE_IMPL_CLASS, metastoreImplClass);
+    }
+
+    /**
+     * Get max entries per scan in metastore.
+     *
+     * @return max entries per scan in metastore.
+     */
+    public int getMetastoreMaxEntriesPerScan() {
+        return getInt(METASTORE_MAX_ENTRIES_PER_SCAN, 50);
+    }
+
+    /**
+     * Set max entries per scan in metastore.
+     *
+     * @param maxEntries
+     *          Max entries per scan in metastore.
+     */
+    public void setMetastoreMaxEntriesPerScan(int maxEntries) {
+        setProperty(METASTORE_MAX_ENTRIES_PER_SCAN, maxEntries);
+    }
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java?rev=1420607&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java Wed Dec 12 10:20:38 2012
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static org.apache.bookkeeper.metastore.MetastoreScannableTable.EMPTY_END_KEY;
+import static org.apache.bookkeeper.metastore.MetastoreTable.ALL_FIELDS;
+import static org.apache.bookkeeper.metastore.MetastoreTable.NON_FIELDS;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.metastore.MSException;
+import org.apache.bookkeeper.metastore.MetaStore;
+import org.apache.bookkeeper.metastore.MetastoreCallback;
+import org.apache.bookkeeper.metastore.MetastoreCursor;
+import org.apache.bookkeeper.metastore.MetastoreCursor.ReadEntriesCallback;
+import org.apache.bookkeeper.metastore.MetastoreException;
+import org.apache.bookkeeper.metastore.MetastoreFactory;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable;
+import org.apache.bookkeeper.metastore.MetastoreScannableTable.Order;
+import org.apache.bookkeeper.metastore.MetastoreTableItem;
+import org.apache.bookkeeper.metastore.Value;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
+import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MetaStore Based Ledger Manager Factory
+ */
+public class MSLedgerManagerFactory extends LedgerManagerFactory {
+
+    static Logger LOG = LoggerFactory.getLogger(MSLedgerManagerFactory.class);
+
+    public static final int CUR_VERSION = 1;
+
+    public static final String TABLE_NAME = "LEDGER";
+    public static final String META_FIELD = ".META";
+
+    AbstractConfiguration conf;
+    ZooKeeper zk;
+    MetaStore metastore;
+
+    @Override
+    public int getCurrentVersion() {
+        return CUR_VERSION;
+    }
+
+    @Override
+    public LedgerManagerFactory initialize(final AbstractConfiguration conf, final ZooKeeper zk,
+            final int factoryVersion) throws IOException {
+        if (CUR_VERSION != factoryVersion) {
+            throw new IOException("Incompatible layout version found : " + factoryVersion);
+        }
+        this.conf = conf;
+        this.zk = zk;
+
+        // load metadata store
+        String msName = conf.getMetastoreImplClass();
+        try {
+            metastore = MetastoreFactory.createMetaStore(msName);
+
+            // TODO: should record version in somewhere. e.g. ZooKeeper
+            int msVersion = metastore.getVersion();
+            metastore.init(conf, msVersion);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize metastore " + msName + " : ", t);
+        }
+
+        return this;
+    }
+
+    @Override
+    public void uninitialize() throws IOException {
+        metastore.close();
+    }
+
+    static Long key2LedgerId(String key) {
+        return null == key ? null : Long.parseLong(key, 10);
+    }
+
+    static String ledgerId2Key(Long lid) {
+        return null == lid ? null : StringUtils.getZKStringId(lid);
+    }
+
+    static String rangeToString(Long firstLedger, boolean firstInclusive, Long lastLedger, boolean lastInclusive) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(firstInclusive ? "[ " : "( ").append(firstLedger).append(" ~ ").append(lastLedger)
+                .append(lastInclusive ? " ]" : " )");
+        return sb.toString();
+    }
+
+    static SortedSet<Long> entries2Ledgers(Iterator<MetastoreTableItem> entries) {
+        SortedSet<Long> ledgers = new TreeSet<Long>();
+        while (entries.hasNext()) {
+            MetastoreTableItem item = entries.next();
+            try {
+                ledgers.add(key2LedgerId(item.getKey()));
+            } catch (NumberFormatException nfe) {
+                LOG.warn("Found invalid ledger key {}", item.getKey());
+            }
+        }
+        return ledgers;
+    }
+
+    static class SyncResult<T> {
+        T value;
+        int rc;
+        boolean finished = false;
+
+        public synchronized void complete(int rc, T value) {
+            this.rc = rc;
+            this.value = value;
+            finished = true;
+
+            notify();
+        }
+
+        public synchronized void block() {
+            try {
+                while (!finished) {
+                    wait();
+                }
+            } catch (InterruptedException ie) {
+            }
+        }
+
+        public synchronized int getRetCode() {
+            return rc;
+        }
+
+        public synchronized T getResult() {
+            return value;
+        }
+    }
+
+    static abstract class AbstractMsLedgerManager implements Closeable {
+
+        final ZooKeeper zk;
+        final AbstractConfiguration conf;
+
+        final MetaStore metastore;
+        final MetastoreScannableTable ledgerTable;
+        final int maxEntriesPerScan;
+
+        AbstractMsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+            this.conf = conf;
+            this.zk = zk;
+            this.metastore = metastore;
+
+            try {
+                ledgerTable = metastore.createScannableTable(TABLE_NAME);
+            } catch (MetastoreException mse) {
+                LOG.error("Failed to instantiate table " + TABLE_NAME + " in metastore " + metastore.getName());
+                throw new RuntimeException("Failed to instantiate table " + TABLE_NAME + " in metastore "
+                        + metastore.getName());
+            }
+            // configuration settings
+            maxEntriesPerScan = conf.getMetastoreMaxEntriesPerScan();
+        }
+
+        @Override
+        public void close() {
+            ledgerTable.close();
+        }
+
+    }
+
+    static class MsLedgerManager extends AbstractMsLedgerManager implements LedgerManager {
+
+        static final String IDGEN_ZNODE = "ms-idgen";
+        static final String IDGENERATION_PREFIX = "/" + IDGEN_ZNODE + "/ID-";
+
+        // Path to generate global id
+        private final String idGenPath;
+
+        // we use this to prevent long stack chains from building up in
+        // callbacks
+        ScheduledExecutorService scheduler;
+
+        MsLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+            super(conf, zk, metastore);
+            this.idGenPath = conf.getZkLedgersRootPath() + IDGENERATION_PREFIX;
+            this.scheduler = Executors.newSingleThreadScheduledExecutor();
+        }
+
+        @Override
+        public void close() {
+            try {
+                scheduler.shutdown();
+            } catch (Exception e) {
+                LOG.warn("Error when closing MsLedgerManager : ", e);
+            }
+            super.close();
+        }
+
+        @Override
+        public void createLedger(final LedgerMetadata metadata, final GenericCallback<Long> ledgerCb) {
+            ZkUtils.createFullPathOptimistic(zk, idGenPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL_SEQUENTIAL, new StringCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx, final String idPathName) {
+                            if (rc != KeeperException.Code.OK.intValue()) {
+                                LOG.error("Could not generate new ledger id",
+                                        KeeperException.create(KeeperException.Code.get(rc), path));
+                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
+                                return;
+                            }
+                            /*
+                             * Extract ledger id from gen path
+                             */
+                            long ledgerId;
+                            try {
+                                ledgerId = getLedgerIdFromGenPath(idPathName);
+                            } catch (IOException e) {
+                                LOG.error("Could not extract ledger-id from id gen path:" + path, e);
+                                ledgerCb.operationComplete(BKException.Code.ZKException, null);
+                                return;
+                            }
+
+                            final long lid = ledgerId;
+                            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+                                @Override
+                                public void complete(int rc, Version version, Object ctx) {
+                                    if (MSException.Code.BadVersion.getCode() == rc) {
+                                        ledgerCb.operationComplete(BKException.Code.MetadataVersionException, null);
+                                        return;
+                                    }
+                                    if (MSException.Code.OK.getCode() != rc) {
+                                        ledgerCb.operationComplete(BKException.Code.MetaStoreException, null);
+                                        return;
+                                    }
+                                    LOG.debug("Create ledger {} with version {} successfuly.", new Object[] { lid,
+                                            version });
+                                    // update version
+                                    metadata.setVersion(version);
+                                    ledgerCb.operationComplete(BKException.Code.OK, lid);
+                                }
+                            };
+
+                            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+                                    Version.NEW, msCallback, null);
+                            zk.delete(idPathName, -1, new AsyncCallback.VoidCallback() {
+                                @Override
+                                public void processResult(int rc, String path, Object ctx) {
+                                    if (rc != KeeperException.Code.OK.intValue()) {
+                                        LOG.warn("Exception during deleting znode for id generation : ",
+                                                KeeperException.create(KeeperException.Code.get(rc), path));
+                                    } else {
+                                        LOG.debug("Deleting znode for id generation : {}", idPathName);
+                                    }
+                                }
+                            }, null);
+                        }
+                    }, null);
+        }
+
+        // get ledger id from generation path
+        private long getLedgerIdFromGenPath(String nodeName) throws IOException {
+            long ledgerId;
+            try {
+                String parts[] = nodeName.split(IDGENERATION_PREFIX);
+                ledgerId = Long.parseLong(parts[parts.length - 1]);
+            } catch (NumberFormatException e) {
+                throw new IOException(e);
+            }
+            return ledgerId;
+        }
+
+        @Override
+        public void deleteLedger(final long ledgerId, final GenericCallback<Void> cb) {
+            MetastoreCallback<Void> msCallback = new MetastoreCallback<Void>() {
+                @Override
+                public void complete(int rc, Void value, Object ctx) {
+                    int bkRc;
+                    if (MSException.Code.NoKey.getCode() == rc) {
+                        LOG.warn("Ledger entry does not exist in meta table: ledgerId={}", ledgerId);
+                        bkRc = BKException.Code.NoSuchLedgerExistsException;
+                    } else if (MSException.Code.OK.getCode() == rc) {
+                        bkRc = BKException.Code.OK;
+                    } else {
+                        bkRc = BKException.Code.MetaStoreException;
+                    }
+                    cb.operationComplete(bkRc, (Void) null);
+                }
+            };
+            ledgerTable.remove(ledgerId2Key(ledgerId), Version.ANY, msCallback, null);
+        }
+
+        @Override
+        public void readLedgerMetadata(final long ledgerId, final GenericCallback<LedgerMetadata> readCb) {
+            final String key = ledgerId2Key(ledgerId);
+            MetastoreCallback<Versioned<Value>> msCallback = new MetastoreCallback<Versioned<Value>>() {
+                @Override
+                public void complete(int rc, Versioned<Value> value, Object ctx) {
+                    if (MSException.Code.NoKey.getCode() == rc) {
+                        LOG.error("No ledger metadata found for ledger " + ledgerId + " : ",
+                                MSException.create(MSException.Code.get(rc), "No key " + key + " found."));
+                        readCb.operationComplete(BKException.Code.NoSuchLedgerExistsException, null);
+                        return;
+                    }
+                    if (MSException.Code.OK.getCode() != rc) {
+                        LOG.error("Could not read metadata for ledger " + ledgerId + " : ",
+                                MSException.create(MSException.Code.get(rc), "Failed to get key " + key));
+                        readCb.operationComplete(BKException.Code.MetaStoreException, null);
+                        return;
+                    }
+                    LedgerMetadata metadata;
+                    try {
+                        metadata = LedgerMetadata
+                                .parseConfig(value.getValue().getField(META_FIELD), value.getVersion());
+                    } catch (IOException e) {
+                        LOG.error("Could not parse ledger metadata for ledger " + ledgerId + " : ", e);
+                        readCb.operationComplete(BKException.Code.MetaStoreException, null);
+                        return;
+                    }
+                    readCb.operationComplete(BKException.Code.OK, metadata);
+                }
+            };
+            ledgerTable.get(key, msCallback, ALL_FIELDS);
+        }
+
+        @Override
+        public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
+                final GenericCallback<Void> cb) {
+            Value data = new Value().setField(META_FIELD, metadata.serialize());
+
+            LOG.debug("Writing ledger {} metadata, version {}", new Object[] { ledgerId, metadata.getVersion() });
+
+            final String key = ledgerId2Key(ledgerId);
+            MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
+                @Override
+                public void complete(int rc, Version version, Object ctx) {
+                    int bkRc;
+                    if (MSException.Code.BadVersion.getCode() == rc) {
+                        LOG.info("Bad version provided to updat metadata for ledger {}", ledgerId);
+                        bkRc = BKException.Code.MetadataVersionException;
+                    } else if (MSException.Code.NoKey.getCode() == rc) {
+                        LOG.warn("Ledger {} doesn't exist when writing its ledger metadata.", ledgerId);
+                        bkRc = BKException.Code.NoSuchLedgerExistsException;
+                    } else if (MSException.Code.OK.getCode() == rc) {
+                        metadata.setVersion(version);
+                        bkRc = BKException.Code.OK;
+                    } else {
+                        LOG.warn("Conditional update ledger metadata failed: ",
+                                MSException.create(MSException.Code.get(rc), "Failed to put key " + key));
+                        bkRc = BKException.Code.MetaStoreException;
+                    }
+
+                    cb.operationComplete(bkRc, null);
+                }
+            };
+            ledgerTable.put(key, data, metadata.getVersion(), msCallback, null);
+        }
+
+        @Override
+        public void asyncProcessLedgers(final Processor<Long> processor, final AsyncCallback.VoidCallback finalCb,
+                final Object context, final int successRc, final int failureRc) {
+            MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+                @Override
+                public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+                    if (MSException.Code.OK.getCode() != rc) {
+                        finalCb.processResult(failureRc, null, context);
+                        return;
+                    }
+                    if (!cursor.hasMoreEntries()) {
+                        finalCb.processResult(successRc, null, context);
+                        return;
+                    }
+                    asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+                }
+            };
+            ledgerTable.openCursor(NON_FIELDS, openCursorCb, null);
+        }
+
+        void asyncProcessLedgers(final MetastoreCursor cursor, final Processor<Long> processor,
+                                 final AsyncCallback.VoidCallback finalCb, final Object context,
+                                 final int successRc, final int failureRc) {
+            scheduler.submit(new Runnable() {
+                @Override
+                public void run() {
+                    doAsyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+                }
+            });
+        }
+
+        void doAsyncProcessLedgers(final MetastoreCursor cursor, final Processor<Long> processor,
+                                   final AsyncCallback.VoidCallback finalCb, final Object context,
+                                   final int successRc, final int failureRc) {
+            // no entries now
+            if (!cursor.hasMoreEntries()) {
+                finalCb.processResult(successRc, null, context);
+                return;
+            }
+            ReadEntriesCallback msCallback = new ReadEntriesCallback() {
+                @Override
+                public void complete(int rc, Iterator<MetastoreTableItem> entries, Object ctx) {
+                    if (MSException.Code.OK.getCode() != rc) {
+                        finalCb.processResult(failureRc, null, context);
+                        return;
+                    }
+
+                    SortedSet<Long> ledgers = new TreeSet<Long>();
+                    while (entries.hasNext()) {
+                        MetastoreTableItem item = entries.next();
+                        try {
+                            ledgers.add(key2LedgerId(item.getKey()));
+                        } catch (NumberFormatException nfe) {
+                            LOG.warn("Found invalid ledger key {}", item.getKey());
+                        }
+                    }
+
+                    if (0 == ledgers.size()) {
+                        // process next batch of ledgers
+                        asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+                        return;
+                    }
+
+                    final long startLedger = ledgers.first();
+                    final long endLedger = ledgers.last();
+
+                    AsyncSetProcessor<Long> setProcessor = new AsyncSetProcessor<Long>(scheduler);
+                    // process set
+                    setProcessor.process(ledgers, processor, new AsyncCallback.VoidCallback() {
+                        @Override
+                        public void processResult(int rc, String path, Object ctx) {
+                            if (successRc != rc) {
+                                LOG.error("Failed when processing range "
+                                        + rangeToString(startLedger, true, endLedger, true));
+                                finalCb.processResult(failureRc, null, context);
+                                return;
+                            }
+                            // process next batch of ledgers
+                            asyncProcessLedgers(cursor, processor, finalCb, context, successRc, failureRc);
+                        }
+                    }, context, successRc, failureRc);
+                }
+            };
+            cursor.asyncReadEntries(maxEntriesPerScan, msCallback, null);
+        }
+    }
+
+    static class MsActiveLedgerManager extends AbstractMsLedgerManager implements ActiveLedgerManager {
+
+        // A sorted map to stored all active ledger ids
+        protected final SnapshotMap<Long, Boolean> activeLedgers;
+
+        MsActiveLedgerManager(final AbstractConfiguration conf, final ZooKeeper zk, final MetaStore metastore) {
+            super(conf, zk, metastore);
+            activeLedgers = new SnapshotMap<Long, Boolean>();
+        }
+
+        @Override
+        public void addActiveLedger(long ledgerId, boolean active) {
+            activeLedgers.put(ledgerId, active);
+        }
+
+        @Override
+        public void removeActiveLedger(long ledgerId) {
+            activeLedgers.remove(ledgerId);
+        }
+
+        @Override
+        public boolean containsActiveLedger(long ledgerId) {
+            return activeLedgers.containsKey(ledgerId);
+        }
+
+        @Override
+        public void garbageCollectLedgers(GarbageCollector gc) {
+            LOG.debug("Start garbage collect ledgers.");
+            NavigableMap<Long, Boolean> snapshot = activeLedgers.snapshot();
+            Long nextLedger = 0L;
+            int numRetries = 3;
+            do {
+                nextLedger = doGcLedgers(nextLedger, snapshot, gc);
+            } while (null != nextLedger && --numRetries > 0);
+            LOG.debug("End garbage collect ledgers.");
+        }
+
+        /**
+         * Do garbage collection starting from <code>startLedger</code>.
+         *
+         * @param startLedgerId
+         *            Start Ledger id
+         * @param snapshot
+         *            Current snapshot of active ledgers
+         * @param gc
+         *            Garbage collector
+         * @return null if finished scanning all ledgers, the next ledger id to
+         *         scan
+         */
+        private Long doGcLedgers(Long startLedgerId, NavigableMap<Long, Boolean> snapshot, GarbageCollector gc) {
+            final SyncResult<MetastoreCursor> result = new SyncResult<MetastoreCursor>();
+            MetastoreCallback<MetastoreCursor> openCursorCb = new MetastoreCallback<MetastoreCursor>() {
+                @Override
+                public void complete(int rc, MetastoreCursor cursor, Object ctx) {
+                    result.complete(rc, cursor);
+                }
+            };
+            ledgerTable.openCursor(ledgerId2Key(startLedgerId), true, EMPTY_END_KEY, true, Order.ASC, NON_FIELDS,
+                    openCursorCb, null);
+            result.block();
+            if (MSException.Code.OK.getCode() != result.getRetCode()) {
+                LOG.warn("Failed to open metastore cursor to run garbage collection : ",
+                        MSException.create(MSException.Code.get(result.getRetCode())));
+                // failed to open a cursor, not continue until next gc
+                return null;
+            }
+
+            MetastoreCursor cursor = result.getResult();
+
+            while (cursor.hasMoreEntries()) {
+                Iterator<MetastoreTableItem> entries;
+                try {
+                    entries = cursor.readEntries(maxEntriesPerScan);
+                } catch (MSException mse) {
+                    LOG.warn("Exception when garbage collecting deleted ledgers : ", mse);
+                    return startLedgerId;
+                }
+
+                if (!entries.hasNext()) {
+                    continue;
+                }
+
+                SortedSet<Long> msActiveLedgers = entries2Ledgers(entries);
+
+                Long endLedgerId = msActiveLedgers.last();
+                Map<Long, Boolean> bkActiveLedgers = snapshot.subMap(startLedgerId, true, endLedgerId, true);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("All active ledgers from Metastore between {} and {} : {}", new Object[] { startLedgerId,
+                            endLedgerId, msActiveLedgers });
+                    LOG.debug("Current active ledgers from Bookie between {} and {} : {}", new Object[] {
+                            startLedgerId, endLedgerId, bkActiveLedgers });
+                }
+                doGc(gc, bkActiveLedgers, msActiveLedgers);
+                // move the pointer
+                startLedgerId = endLedgerId + 1;
+            }
+            doGc(gc, snapshot.tailMap(startLedgerId), new TreeSet<Long>());
+            return null;
+        }
+
+        /**
+         * Do garbage collecting comparing hosted ledgers and metastore ledgers
+         *
+         * @param gc
+         *            Garbage collector to do garbage collection when found
+         *            inactive/deleted ledgers
+         * @param bkActiveLedgers
+         *            Active ledgers hosted in bookie server
+         * @param msAllLedgers
+         *            All ledgers stored in metastore
+         */
+        void doGc(GarbageCollector gc, Map<Long, Boolean> bkActiveLedgers, Set<Long> msAllLedgers) {
+            // remove any active ledgers that doesn't exist in zk
+            for (Long bkLid : bkActiveLedgers.keySet()) {
+                if (!msAllLedgers.contains(bkLid)) {
+                    // remove it from current active ledger
+                    LOG.debug("gc ledger: {}", bkLid);
+                    bkActiveLedgers.remove(bkLid);
+                    gc.gc(bkLid);
+                }
+            }
+        }
+    }
+
+    @Override
+    public LedgerManager newLedgerManager() {
+        return new MsLedgerManager(conf, zk, metastore);
+    }
+
+    @Override
+    public ActiveLedgerManager newActiveLedgerManager() {
+        return new MsActiveLedgerManager(conf, zk, metastore);
+    }
+
+    @Override
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
+            InterruptedException, ReplicationException.CompatibilityException {
+        // TODO: currently just use zk ledger underreplication manager
+        return new ZkLedgerUnderreplicationManager(conf, zk);
+    }
+
+    /**
+     * Process set one by one in asynchronize way. Process will be stopped
+     * immediately when error occurred.
+     */
+    private static class AsyncSetProcessor<T> {
+        // use this to prevent long stack chains from building up in callbacks
+        ScheduledExecutorService scheduler;
+
+        /**
+         * Constructor
+         *
+         * @param scheduler
+         *            Executor used to prevent long stack chains
+         */
+        public AsyncSetProcessor(ScheduledExecutorService scheduler) {
+            this.scheduler = scheduler;
+        }
+
+        /**
+         * Process set of items
+         *
+         * @param data
+         *            Set of data to process
+         * @param processor
+         *            Callback to process element of list when success
+         * @param finalCb
+         *            Final callback to be called after all elements in the list
+         *            are processed
+         * @param contxt
+         *            Context of final callback
+         * @param successRc
+         *            RC passed to final callback on success
+         * @param failureRc
+         *            RC passed to final callback on failure
+         */
+        public void process(final Set<T> data, final Processor<T> processor, final AsyncCallback.VoidCallback finalCb,
+                final Object context, final int successRc, final int failureRc) {
+            if (data == null || data.size() == 0) {
+                finalCb.processResult(successRc, null, context);
+                return;
+            }
+            final Iterator<T> iter = data.iterator();
+            AsyncCallback.VoidCallback stubCallback = new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    if (rc != successRc) {
+                        // terminal immediately
+                        finalCb.processResult(failureRc, null, context);
+                        return;
+                    }
+                    if (!iter.hasNext()) { // reach the end of list
+                        finalCb.processResult(successRc, null, context);
+                        return;
+                    }
+                    // process next element
+                    final T dataToProcess = iter.next();
+                    final AsyncCallback.VoidCallback stub = this;
+                    scheduler.submit(new Runnable() {
+                        @Override
+                        public final void run() {
+                            processor.process(dataToProcess, stub);
+                        }
+                    });
+                }
+            };
+            T firstElement = iter.next();
+            processor.process(firstElement, stubCallback);
+        }
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Wed Dec 12 10:20:38 2012
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
@@ -794,6 +795,7 @@ public class BookieRecoveryTest extends 
         adminConf.setZkServers(zkUtil.getZooKeeperConnectString());
         adminConf.setBookieRecoveryDigestType(digestCorrect);
         adminConf.setBookieRecoveryPasswd(passwdBad);
+        setMetastoreImplClass(adminConf);
 
         BookKeeperAdmin bka = new BookKeeperAdmin(adminConf);
         bka.recoverBookieData(bookieSrc, null);
@@ -816,6 +818,7 @@ public class BookieRecoveryTest extends 
         // Try to recover with no password in conf
         adminConf = new ClientConfiguration();
         adminConf.setZkServers(zkUtil.getZooKeeperConnectString());
+        setMetastoreImplClass(adminConf);
 
         bka = new BookKeeperAdmin(adminConf);
         bka.recoverBookieData(bookieSrc, null);
@@ -832,6 +835,13 @@ public class BookieRecoveryTest extends 
      */
     @Test
     public void ensurePasswordUsedForOldLedgers() throws Exception {
+        // This test bases on creating old ledgers in version 4.1.0, which only
+        // supports ZooKeeper based flat and hierarchical LedgerManagerFactory.
+        // So we ignore it for MSLedgerManagerFactory.
+        if (MSLedgerManagerFactory.class.getName().equals(ledgerManagerFactory)) {
+            return;
+        }
+
         // stop all bookies
         // and wipe the ledger layout so we can use an old client
         zkUtil.getZooKeeperClient().delete("/ledgers/LAYOUT", -1);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/metastore/TestMetaStore.java Wed Dec 12 10:20:38 2012
@@ -221,7 +221,7 @@ public class TestMetaStore extends TestC
     }
 
     protected Integer getRandom() {
-        return (int)Math.random()*65536;
+        return (int)(Math.random()*65536);
     }
 
     protected Versioned<Value> getRecord(String recordId) throws Exception {

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java Wed Dec 12 10:20:38 2012
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.client.Ledg
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MSLedgerManagerFactory;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
 import org.apache.commons.io.FileUtils;
@@ -132,6 +133,13 @@ public class BookieLedgerIndexTest exten
      */
     @Test
     public void testWithoutZookeeper() throws Exception {
+        // This test case is for ledger metadata that stored in ZooKeeper. As
+        // far as MSLedgerManagerFactory, ledger metadata are stored in other
+        // storage. So this test is not suitable for MSLedgerManagerFactory.
+        if (newLedgerManagerFactory instanceof MSLedgerManagerFactory) {
+            return;
+        }
+
         for (int i = 0; i < numberOfLedgers; i++) {
             createAndAddEntriesToLedger().close();
         }
@@ -141,7 +149,7 @@ public class BookieLedgerIndexTest exten
         stopZKCluster();
         try {
             bookieLedgerIndex.getBookieToLedgerIndex();
-            fail("Must throw exception as bookies are not running!");
+            fail("Must throw exception as zookeeper are not running!");
         } catch (BKAuditException bkAuditException) {
             // expected behaviour
         }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Wed Dec 12 10:20:38 2012
@@ -30,8 +30,10 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.commons.io.FileUtils;
@@ -74,6 +76,10 @@ public abstract class BookKeeperClusterT
     @Override
     public void setUp() throws Exception {
         LOG.info("Setting up test {}", getName());
+        InMemoryMetaStore.reset();
+        setMetastoreImplClass(baseConf);
+        setMetastoreImplClass(baseClientConf);
+
         try {
             // start zookeeper service
             startZKCluster();
@@ -382,4 +388,9 @@ public abstract class BookKeeperClusterT
 
         return server;
     }
+
+    public void setMetastoreImplClass(AbstractConfiguration conf) {
+        conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
+    }
+
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerMultiDigestTestCase.java Wed Dec 12 10:20:38 2012
@@ -49,6 +49,7 @@ public abstract class MultiLedgerManager
         String[] ledgerManagers = {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
         };
         ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);
         DigestType[] digestTypes = new DigestType[] { DigestType.MAC, DigestType.CRC32 };

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java?rev=1420607&r1=1420606&r2=1420607&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/MultiLedgerManagerTestCase.java Wed Dec 12 10:20:38 2012
@@ -45,6 +45,7 @@ public abstract class MultiLedgerManager
         String[] ledgerManagers = new String[] {
             "org.apache.bookkeeper.meta.FlatLedgerManagerFactory",
             "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory",
+            "org.apache.bookkeeper.meta.MSLedgerManagerFactory",
         };
         ArrayList<Object[]> cfgs = new ArrayList<Object[]>(ledgerManagers.length);
         for (String lm : ledgerManagers) {