You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/06/19 12:39:38 UTC

svn commit: r1351646 [2/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ book...

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java Tue Jun 19 10:39:37 2012
@@ -44,8 +44,10 @@ class LedgerLayout {
 
     // Znode name to store layout information
     public static final String LAYOUT_ZNODE = "LAYOUT";
+    // version of compability layout version
+    public static final int LAYOUT_MIN_COMPAT_VERSION = 1;
     // version of ledger layout metadata
-    public static final int LAYOUT_FORMAT_VERSION = 1;
+    public static final int LAYOUT_FORMAT_VERSION = 2;
 
     /**
      * Read ledger layout from zookeeper
@@ -77,8 +79,8 @@ class LedgerLayout {
     static final String splitter = ":";
     static final String lSplitter = "\n";
 
-    // ledger manager class
-    private String managerType;
+    // ledger manager factory class
+    private String managerFactoryCls;
     // ledger manager version
     private int managerVersion;
 
@@ -88,20 +90,43 @@ class LedgerLayout {
     /**
      * Ledger Layout Constructor
      *
-     * @param type
-     *          Ledger Manager Type
+     * @param managerFactoryCls
+     *          Ledger Manager Factory Class
      * @param managerVersion
      *          Ledger Manager Version
      * @param layoutFormatVersion
      *          Ledger Layout Format Version
      */
-    public LedgerLayout(String managerType, int managerVersion) {
-        this.managerType = managerType;
+    public LedgerLayout(String managerFactoryCls, int managerVersion) {
+        this(managerFactoryCls, managerVersion, LAYOUT_FORMAT_VERSION);
+    }
+
+    LedgerLayout(String managerFactoryCls, int managerVersion,
+                 int layoutVersion) {
+        this.managerFactoryCls = managerFactoryCls;
         this.managerVersion = managerVersion;
+        this.layoutFormatVersion = layoutVersion;
     }
 
+    /**
+     * Get Ledger Manager Type
+     *
+     * @return ledger manager type
+     * @deprecated replaced by {@link #getManagerFactoryClass()}
+     */
+    @Deprecated
     public String getManagerType() {
-        return this.managerType;
+        // pre V2 layout store as manager type
+        return this.managerFactoryCls;
+    }
+
+    /**
+     * Get ledger manager factory class
+     *
+     * @return ledger manager factory class
+     */
+    public String getManagerFactoryClass() {
+        return this.managerFactoryCls;
     }
 
     public int getManagerVersion() {
@@ -109,6 +134,15 @@ class LedgerLayout {
     }
 
     /**
+     * Return layout format version
+     *
+     * @return layout format version
+     */
+    public int getLayoutFormatVersion() {
+        return this.layoutFormatVersion;
+    }
+
+    /**
      * Store the ledger layout into zookeeper
      */
     public void store(final ZooKeeper zk, String ledgersRoot) 
@@ -126,7 +160,7 @@ class LedgerLayout {
     private byte[] serialize() throws IOException {
         StringBuilder sb = new StringBuilder();
         sb.append(layoutFormatVersion).append(lSplitter)
-            .append(managerType).append(splitter).append(managerVersion);
+            .append(managerFactoryCls).append(splitter).append(managerVersion);
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Serialized layout info: " + sb.toString());
@@ -157,7 +191,8 @@ class LedgerLayout {
 
         try {
             int layoutFormatVersion = new Integer(lines[0]);
-            if (LAYOUT_FORMAT_VERSION != layoutFormatVersion) {
+            if (LAYOUT_FORMAT_VERSION < layoutFormatVersion ||
+                LAYOUT_MIN_COMPAT_VERSION > layoutFormatVersion) {
                 throw new IOException("Metadata version not compatible. Expected " 
                         + LAYOUT_FORMAT_VERSION + ", but got " + layoutFormatVersion);
             }
@@ -170,11 +205,11 @@ class LedgerLayout {
             if (parts.length != 2) {
                 throw new IOException("Invalid Ledger Manager defined in layout : " + layout);
             }
-            // ledger manager class
-            String managerType = parts[0];
+            // ledger manager factory class
+            String managerFactoryCls = parts[0];
             // ledger manager version
             int managerVersion = new Integer(parts[1]);
-            return new LedgerLayout(managerType, managerVersion);
+            return new LedgerLayout(managerFactoryCls, managerVersion, layoutFormatVersion);
         } catch (NumberFormatException e) {
             throw new IOException(e);
         }
@@ -189,20 +224,20 @@ class LedgerLayout {
             return false;
         }
         LedgerLayout other = (LedgerLayout)obj;
-        return managerType.equals(other.managerType) &&
-            managerVersion == other.managerVersion;
+        return managerFactoryCls.equals(other.managerFactoryCls)
+            && managerVersion == other.managerVersion;
     }
 
     @Override
     public int hashCode() {
-        return (managerType + managerVersion).hashCode();
+        return (managerFactoryCls + managerVersion).hashCode();
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("LV").append(layoutFormatVersion).append(":")
-            .append(",Type:").append(managerType).append(":")
+            .append(",Type:").append(managerFactoryCls).append(":")
             .append(managerVersion);
         return sb.toString();
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java Tue Jun 19 10:39:37 2012
@@ -18,7 +18,7 @@ package org.apache.bookkeeper.meta;
  * limitations under the License.
  */
 
-import java.io.IOException;
+import java.io.Closeable;
 
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.bookkeeper.client.LedgerMetadata;
@@ -26,44 +26,55 @@ import org.apache.bookkeeper.proto.Bookk
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 
 /**
- * LedgerManager takes responsibility of ledger management
+ * LedgerManager takes responsibility of ledger management in client side.
  *
  * <ul>
  * <li>How to store ledger meta (e.g. in ZooKeeper or other key/value store)
- * <li>How to manager active ledgers (so know how to do garbage collection)
- * <li>How to garbage collect inactive/deleted ledgers
  * </ul>
  */
-public interface LedgerManager {
+public interface LedgerManager extends Closeable {
 
     /**
-     * Get the path that is used to store ledger metadata
+     * Create a new ledger with provided metadata
+     *
+     * @param metadata
+     *        Metadata provided when creating a new ledger
+     * @param cb
+     *        Callback when creating a new ledger.
+     */
+    public abstract void createLedger(LedgerMetadata metadata, GenericCallback<Long> cb);
+
+    /**
+     * Delete a specified ledger by ledgerId.
      *
      * @param ledgerId
-     *          Ledger ID
-     * @return ledger node path
+     *          Ledger Id
+     * @param cb
+     *          Callback when deleted ledger.
      */
-    public String getLedgerPath(long ledgerId);
+    public abstract void deleteLedger(long ledgerId, GenericCallback<Void> cb);
 
     /**
-     * Get ledger id from its ledger path
+     * Read ledger metadata of a specified ledger.
      *
-     * @param ledgerPath
-     *          Ledger path to store metadata
-     * @return ledger id
-     * @throws IOException when the ledger path is invalid
+     * @param ledgerId
+     *          Ledger Id
+     * @param readCb
+     *          Callback when read ledger metadata.
      */
-    public long getLedgerId(String ledgerPath) throws IOException;
+    public abstract void readLedgerMetadata(long ledgerId, GenericCallback<LedgerMetadata> readCb);
 
     /**
-     * Create a new zk ledger path with provided metadata
+     * Write ledger metadata.
      *
-     * @param cb
-     *        Callback when getting new zk ledger path to create.
+     * @param ledgerId
+     *          Ledger Id
      * @param metadata
-     *        Metadata provided when creating a new ledger
+     *          Ledger Metadata to write
+     * @param cb
+     *          Callback when finished writing ledger metadata.
      */
-    public abstract void newLedgerPath(GenericCallback<String> cb, LedgerMetadata metadata);
+    public abstract void writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, GenericCallback<Void> cb);
 
     /**
      * Loop to process all ledgers.
@@ -88,60 +99,4 @@ public interface LedgerManager {
      */
     public void asyncProcessLedgers(Processor<Long> processor, AsyncCallback.VoidCallback finalCb,
                                     Object context, int successRc, int failureRc);
-
-    /**
-     * Add active ledger
-     *
-     * @param ledgerId
-     *          Ledger ID
-     * @param active
-     *          Status of ledger
-     */
-    public void addActiveLedger(long ledgerId, boolean active);
-
-    /**
-     * Remove active ledger
-     *
-     * @param ledgerId
-     *          Ledger ID
-     */
-    public void removeActiveLedger(long ledgerId);
-
-    /**
-     * Is Ledger ledgerId in active ledgers set
-     *
-     * @param ledgerId
-     *          Ledger ID
-     * @return true if the ledger is in active ledgers set, otherwise return false
-     */
-    public boolean containsActiveLedger(long ledgerId);
-
-    /**
-     * Garbage Collector which handles ledger deletion in server side
-     */
-    public static interface GarbageCollector {
-        /**
-         * garbage collecting a specific ledger
-         *
-         * @param ledgerId
-         *          Ledger ID to be garbage collected
-         */
-        public void gc(long ledgerId);
-    }
-
-    /**
-     * Garbage collecting all inactive/deleted ledgers
-     * <p>
-     * GarbageCollector#gc is triggered each time we found a ledger could be garbage collected.
-     * After method finished, all those inactive ledgers should be garbage collected.
-     * </p>
-     *
-     * @param gc garbage collector
-     */
-    public void garbageCollectLedgers(GarbageCollector gc);
-
-    /**
-     * Close ledger manager
-     */
-    public void close();
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java Tue Jun 19 10:39:37 2012
@@ -20,54 +20,127 @@ package org.apache.bookkeeper.meta;
 
 import java.io.IOException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+public abstract class LedgerManagerFactory {
+
+    static final Logger LOG = LoggerFactory.getLogger(LedgerManagerFactory.class);
+    // v1 layout
+    static final int V1 = 1;
+
+    /**
+     * Return current factory version.
+     *
+     * @return current version used by factory.
+     */
+    public abstract int getCurrentVersion();
+
+    /**
+     * Initialize a factory.
+     *
+     * @param conf
+     *          Configuration object used to initialize factory
+     * @param zk
+     *          Available zookeeper handle for ledger manager to use.
+     * @param factoryVersion
+     *          What version used to initialize factory.
+     * @return ledger manager factory instance
+     * @throws IOException when fail to initialize the factory.
+     */
+    public abstract LedgerManagerFactory initialize(final AbstractConfiguration conf,
+                                                    final ZooKeeper zk,
+                                                    final int factoryVersion)
+    throws IOException;
+
+    /**
+     * Uninitialize the factory.
+     *
+     * @throws IOException when fail to uninitialize the factory.
+     */
+    public abstract void uninitialize() throws IOException;
+
+    /**
+     * return ledger manager for client-side to manage ledger metadata.
+     *
+     * @return ledger manager
+     * @see LedgerManager
+     */
+    public abstract LedgerManager newLedgerManager();
+
+    /**
+     * return active ledger manager for server side to manage active ledgers.
+     *
+     * @return active ledger manager
+     * @see ActiveLedgerManager
+     */
+    public abstract ActiveLedgerManager newActiveLedgerManager();
 
-/**
- * <code>LedgerManagerFactory</code> takes responsibility of creating new ledger manager.
- */
-public class LedgerManagerFactory {
     /**
-     * Create new Ledger Manager.
+     * Create new Ledger Manager Factory.
      *
      * @param conf
      *          Configuration Object.
      * @param zk
      *          ZooKeeper Client Handle, talk to zk to know which ledger manager is used.
-     * @return new ledger manager
+     * @return new ledger manager factory
      * @throws IOException
      */
-    public static LedgerManager newLedgerManager(
+    public static LedgerManagerFactory newLedgerManagerFactory(
         final AbstractConfiguration conf, final ZooKeeper zk)
             throws IOException, KeeperException, InterruptedException {
-        String lmType = conf.getLedgerManagerType();
+        Class<? extends LedgerManagerFactory> factoryClass;
+        try {
+            factoryClass = conf.getLedgerManagerFactoryClass();
+        } catch (Exception e) {
+            throw new IOException("Failed to get ledger manager factory class from configuration : ", e);
+        }
         String ledgerRootPath = conf.getZkLedgersRootPath();
-            
+
         if (null == ledgerRootPath || ledgerRootPath.length() == 0) {
             throw new IOException("Empty Ledger Root Path.");
         }
-        
+
         // if zk is null, return the default ledger manager
         if (zk == null) {
-            return new FlatLedgerManager(conf, null,
-                    ledgerRootPath, FlatLedgerManager.CUR_VERSION);
+            return new FlatLedgerManagerFactory()
+                   .initialize(conf, null, FlatLedgerManagerFactory.CUR_VERSION);
         }
 
+        LedgerManagerFactory lmFactory;
+
         // check that the configured ledger manager is
         // compatible with the existing layout
         LedgerLayout layout = LedgerLayout.readLayout(zk, ledgerRootPath);
         if (layout == null) { // no existing layout
-            if (lmType == null 
-                || lmType.equals(FlatLedgerManager.NAME)) {
-                layout = new LedgerLayout(FlatLedgerManager.NAME, 
-                                          FlatLedgerManager.CUR_VERSION);
-            } else if (lmType.equals(HierarchicalLedgerManager.NAME)) {
-                layout = new LedgerLayout(HierarchicalLedgerManager.NAME, 
-                                          HierarchicalLedgerManager.CUR_VERSION);
-            } else {
-                throw new IOException("Unknown ledger manager type " + lmType);
+            // use default ledger manager factory if no one provided
+            if (factoryClass == null) {
+                // for backward compatibility, check manager type
+                String lmType = conf.getLedgerManagerType();
+                if (lmType == null) {
+                    factoryClass = FlatLedgerManagerFactory.class;
+                } else {
+                    if (FlatLedgerManagerFactory.NAME.equals(lmType)) {
+                        factoryClass = FlatLedgerManagerFactory.class;
+                    } else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
+                        factoryClass = HierarchicalLedgerManagerFactory.class;
+                    } else {
+                        throw new IOException("Unknown ledger manager type: " + lmType);
+                    }
+                }
+            }
+
+            try {
+                lmFactory = ReflectionUtils.newInstance(factoryClass);
+            } catch (Throwable t) {
+                throw new IOException("Fail to instantiate ledger manager factory class : " + factoryClass, t);
             }
+            layout = new LedgerLayout(factoryClass.getName(), lmFactory.getCurrentVersion());
             try {
                 layout.store(zk, ledgerRootPath);
             } catch (KeeperException.NodeExistsException nee) {
@@ -78,21 +151,57 @@ public class LedgerManagerFactory {
                             + "layout " + layout);
                 }
             }
-        } else if (lmType != null && !layout.getManagerType().equals(lmType)) {
-            throw new IOException("Configured layout " + lmType
-                    + " does not match existing layout " + layout.getManagerType());
+            return lmFactory.initialize(conf, zk, lmFactory.getCurrentVersion());
         }
+        LOG.debug("read ledger layout {}", layout);
 
-        // create the ledger manager
-        if (FlatLedgerManager.NAME.equals(layout.getManagerType())) {
-            return new FlatLedgerManager(conf, zk, ledgerRootPath, 
-                                         layout.getManagerVersion());
-        } else if (HierarchicalLedgerManager.NAME.equals(layout.getManagerType())) {
-            return new HierarchicalLedgerManager(conf, zk, ledgerRootPath,
-                                                 layout.getManagerVersion());
-        } else {
-            throw new IOException("Unknown ledger manager type: " + lmType);
+        // there is existing layout, we need to look into the layout.
+        // handle pre V2 layout
+        if (layout.getLayoutFormatVersion() <= V1) {
+            // pre V2 layout we use type of ledger manager
+            String lmType = conf.getLedgerManagerType();
+            if (lmType != null && !layout.getManagerType().equals(lmType)) {
+                throw new IOException("Configured layout " + lmType
+                                    + " does not match existing layout "  + layout.getManagerType());
+            }
+
+            // create the ledger manager
+            if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
+                lmFactory = new FlatLedgerManagerFactory();
+            } else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerType())) {
+                lmFactory = new HierarchicalLedgerManagerFactory();
+            } else {
+                throw new IOException("Unknown ledger manager type: " + lmType);
+            }
+            return lmFactory.initialize(conf, zk, layout.getManagerVersion());
+        }
+
+        // handle V2 layout case
+        if (factoryClass != null &&
+            !layout.getManagerFactoryClass().equals(factoryClass.getName())) {
+
+            throw new IOException("Configured layout " + factoryClass.getName()
+                                + " does not match existing layout "  + layout.getManagerFactoryClass());
+        }
+        if (factoryClass == null) {
+            // no factory specified in configuration
+            try {
+                Class<?> theCls = Class.forName(layout.getManagerFactoryClass());
+                if (!LedgerManagerFactory.class.isAssignableFrom(theCls)) {
+                    throw new IOException("Wrong ledger manager factory " + layout.getManagerFactoryClass());
+                }
+                factoryClass = theCls.asSubclass(LedgerManagerFactory.class);
+            } catch (ClassNotFoundException cnfe) {
+                throw new IOException("Failed to instantiate ledger manager factory " + layout.getManagerFactoryClass());
+            }
+        }
+        // instantiate a factory
+        try {
+            lmFactory = ReflectionUtils.newInstance(factoryClass);
+        } catch (Throwable t) {
+            throw new IOException("Fail to instantiate ledger manager factory class : " + factoryClass, t);
         }
+        return lmFactory.initialize(conf, zk, layout.getManagerVersion());
     }
 
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkVersion.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.meta;
+
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Version.Occurred;
+
+public class ZkVersion implements Version {
+    int znodeVersion;
+
+    public ZkVersion(int version) {
+        znodeVersion = version;
+    }
+
+    @Override
+    public Occurred compare(Version v) {
+        if (v == null) {
+            return Occurred.AFTER;
+        }
+        if (!(v instanceof ZkVersion)) {
+            throw new IllegalArgumentException("Invalid version type");
+        }
+        ZkVersion zv = (ZkVersion)v;
+        int res = znodeVersion - zv.znodeVersion;
+        if (res == 0) {
+            return Occurred.CONCURRENTLY;
+        } else if (res < 0) {
+            return Occurred.BEFORE;
+        } else {
+            return Occurred.AFTER;
+        }
+    }
+
+    public int getZnodeVersion() {
+        return znodeVersion;
+    }
+
+    public ZkVersion setZnodeVersion(int znodeVersion) {
+        this.znodeVersion = znodeVersion;
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return Integer.toString(znodeVersion, 10);
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ReflectionUtils.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.util;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+
+/**
+ * General Class Reflection Utils
+ */
+public class ReflectionUtils {
+
+    private static final Map<Class<?>, Constructor<?>> constructorCache =
+            new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+    /**
+     * Get the value of the <code>name</code> property as a <code>Class</code>.
+     * If no such property is specified, then <code>defaultCls</code> is returned.
+     *
+     * @param conf
+     *          Configuration Object.
+     * @param name
+     *          Class Property Name.
+     * @param defaultCls
+     *          Default Class to be returned.
+     * @param classLoader
+     *          Class Loader to load class.
+     * @return property value as a <code>Class</code>, or <code>defaultCls</code>.
+     * @throws ConfigurationException
+     */
+    public static Class<?> getClass(Configuration conf, String name,
+                                    Class<?> defaultCls, ClassLoader classLoader)
+            throws ConfigurationException {
+        String valueStr = conf.getString(name);
+        if (null == valueStr) {
+            return defaultCls;
+        }
+        try {
+            return Class.forName(valueStr, true, classLoader);
+        } catch (ClassNotFoundException cnfe) {
+            throw new ConfigurationException(cnfe);
+        }
+    }
+
+    /**
+     * Get the value of the <code>name</code> property as a <code>Class</code> implementing
+     * the interface specified by <code>xface</code>.
+     *
+     * If no such property is specified, then <code>defaultValue</code> is returned.
+     *
+     * An exception is thrown if the returned class does not implement the named interface.
+     *
+     * @param conf
+     *          Configuration Object.
+     * @param name
+     *          Class Property Name.
+     * @param defaultValue
+     *          Default Class to be returned.
+     * @param xface
+     *          The interface implemented by the named class.
+     * @param classLoader
+     *          Class Loader to load class.
+     * @return property value as a <code>Class</code>, or <code>defaultValue</code>.
+     * @throws ConfigurationException
+     */
+    public static <U> Class<? extends U> getClass(Configuration conf,
+                                                  String name, Class<? extends U> defaultValue,
+                                                  Class<U> xface, ClassLoader classLoader)
+        throws ConfigurationException {
+        try {
+            Class<?> theCls = getClass(conf, name, defaultValue, classLoader);
+            if (null != theCls && !xface.isAssignableFrom(theCls)) {
+                throw new ConfigurationException(theCls + " not " + xface.getName());
+            } else if (null != theCls) {
+                return theCls.asSubclass(xface);
+            } else {
+                return null;
+            }
+        } catch (Exception e) {
+            throw new ConfigurationException(e);
+        }
+    }
+
+    /**
+     * Create an object for the given class.
+     *
+     * @param theCls
+     *          class of which an object is created.
+     * @return a new object
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T newInstance(Class<T> theCls) {
+        T result;
+        try {
+            Constructor<T> meth = (Constructor<T>) constructorCache.get(theCls);
+            if (null == meth) {
+                meth = theCls.getDeclaredConstructor();
+                meth.setAccessible(true);
+                constructorCache.put(theCls, meth);
+            }
+            result = meth.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return result;
+    }
+}

Copied: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java (from r1351374, zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java)
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java?p2=zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java&p1=zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java&r1=1351374&r2=1351646&rev=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/HierarchicalLedgerDeleteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Version.java Tue Jun 19 10:39:37 2012
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.meta;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,24 +18,15 @@ package org.apache.bookkeeper.meta;
  * under the License.
  *
  */
-
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.meta.HierarchicalLedgerManager;
-import org.apache.bookkeeper.test.LedgerDeleteTest;
-
-import org.junit.Before;
+package org.apache.bookkeeper.versioning;
 
 /**
- * Test ledger delete using HierarchicalLedgerManager
+ * An interface that allows us to determine if a given version happened before or after another version.
  */
-public class HierarchicalLedgerDeleteTest extends LedgerDeleteTest {
-
-    @Before
-    @Override
-    public void setUp() throws Exception {
-        baseConf.setLedgerManagerType(HierarchicalLedgerManager.NAME);
-        baseClientConf.setLedgerManagerType(HierarchicalLedgerManager.NAME);
-        super.setUp();
+public interface Version {
+    public enum Occurred {
+        BEFORE, AFTER, CONCURRENTLY
     }
 
+    public Occurred compare(Version v);
 }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/Versioned.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.versioning;
+
+public class Versioned<T> {
+    T value;
+    Version version;
+
+    public Versioned(T value, Version version) {
+        this.value = value;
+        this.version = version;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setVersion(Version version) {
+        this.version = version;
+    }
+
+    public Version getVersion() {
+        return version;
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java Tue Jun 19 10:39:37 2012
@@ -26,7 +26,7 @@ import java.io.IOException;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.ActiveLedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -44,7 +44,8 @@ import junit.framework.TestCase;
 public class LedgerCacheTest extends TestCase {
     static Logger LOG = LoggerFactory.getLogger(LedgerCacheTest.class);
 
-    LedgerManager ledgerManager;
+    ActiveLedgerManager activeLedgerManager;
+    LedgerManagerFactory ledgerManagerFactory;
     LedgerCache ledgerCache;
     ServerConfiguration conf;
     File txnDir, ledgerDir;
@@ -66,19 +67,22 @@ public class LedgerCacheTest extends Tes
         conf.setJournalDirName(txnDir.getPath());
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
-        ledgerManager = LedgerManagerFactory.newLedgerManager(conf, null);
+        ledgerManagerFactory =
+            LedgerManagerFactory.newLedgerManagerFactory(conf, null);
+        activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
     }
 
     @Override
     @After
     public void tearDown() throws Exception {
-        ledgerManager.close();
+        activeLedgerManager.close();
+        ledgerManagerFactory.uninitialize();
         FileUtils.deleteDirectory(txnDir);
         FileUtils.deleteDirectory(ledgerDir);
     }
 
     private void newLedgerCache() {
-        ledgerCache = new LedgerCacheImpl(conf, ledgerManager);
+        ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager);
     }
 
     @Test

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Tue Jun 19 10:39:37 2012
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.client;
  *
  */
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -39,7 +38,7 @@ import org.jboss.netty.buffer.ChannelBuf
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.test.BaseTestCase;
+import org.apache.bookkeeper.test.MultiLedgerManagerMultiDigestTestCase;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -47,17 +46,13 @@ import org.apache.bookkeeper.client.BKEx
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
-import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,7 +60,7 @@ import org.junit.Test;
 /**
  * This class tests the bookie recovery admin functionality.
  */
-public class BookieRecoveryTest extends BaseTestCase {
+public class BookieRecoveryTest extends MultiLedgerManagerMultiDigestTestCase {
     static Logger LOG = LoggerFactory.getLogger(BookieRecoveryTest.class);
 
     // Object used for synchronizing async method calls
@@ -84,7 +79,7 @@ public class BookieRecoveryTest extends 
         @Override
         public void recoverComplete(int rc, Object ctx) {
             LOG.info("Recovered bookie operation completed with rc: " + rc);
-            success = rc == Code.OK.intValue();
+            success = rc == BKException.Code.OK;
             SyncObject sync = (SyncObject) ctx;
             synchronized (sync) {
                 sync.value = true;
@@ -100,9 +95,13 @@ public class BookieRecoveryTest extends 
     BookKeeperAdmin bkAdmin;
 
     // Constructor
-    public BookieRecoveryTest(DigestType digestType) {
+    public BookieRecoveryTest(String ledgerManagerFactory, DigestType digestType) {
         super(3);
         this.digestType = digestType;
+        LOG.info("Using ledger manager " + ledgerManagerFactory);
+        // set ledger manager
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
     }
 
     @Before
@@ -135,8 +134,8 @@ public class BookieRecoveryTest extends 
      *            Number of ledgers to create
      * @return List of LedgerHandles for each of the ledgers created
      */
-    private List<LedgerHandle> createLedgers(int numLedgers) 
-            throws BKException, KeeperException, IOException, InterruptedException 
+    private List<LedgerHandle> createLedgers(int numLedgers)
+            throws BKException, IOException, InterruptedException
     {
         return createLedgers(numLedgers, 3, 2);
     }
@@ -150,12 +149,12 @@ public class BookieRecoveryTest extends 
      * @param quorum Quorum size for ledgers
      * @return List of LedgerHandles for each of the ledgers created
      */
-    private List<LedgerHandle> createLedgers(int numLedgers, int ensemble, int quorum) 
-            throws BKException, KeeperException, IOException,
+    private List<LedgerHandle> createLedgers(int numLedgers, int ensemble, int quorum)
+            throws BKException, IOException,
         InterruptedException {
         List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
         for (int i = 0; i < numLedgers; i++) {
-            lhs.add(bkc.createLedger(ensemble, quorum, 
+            lhs.add(bkc.createLedger(ensemble, quorum,
                                      digestType, baseClientConf.getBookieRecoveryPasswd()));
         }
         return lhs;
@@ -433,6 +432,7 @@ public class BookieRecoveryTest extends 
             numSuccess = new AtomicLong(0);
         }
 
+        @Override
         public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
             if (LOG.isDebugEnabled()) {
                 InetSocketAddress addr = (InetSocketAddress)ctx;
@@ -455,11 +455,7 @@ public class BookieRecoveryTest extends 
     }
 
     private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception {
-        String znodepath = bkc.getLedgerManager().getLedgerPath(lh.getId());
-        Stat stat = bkc.getZkHandle().exists(znodepath, false);
-        assertNotNull(stat);
-        byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat); 
-        LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion()); 
+        LedgerMetadata md = getLedgerMetadata(lh);
 
         Map<Long, ArrayList<InetSocketAddress>> ensembles = md.getEnsembles();
 
@@ -471,7 +467,7 @@ public class BookieRecoveryTest extends 
             ranges.put(keyList.get(i), keyList.get(i+1));
         }
         ranges.put(keyList.get(keyList.size()-1), untilEntry);
-        
+
         for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
             int quorum = md.quorumSize;
             long startEntryId = e.getKey();
@@ -490,7 +486,7 @@ public class BookieRecoveryTest extends 
             if (numSuccess < expectedSuccess) {
                 LOG.warn("Fragment not fully replicated ledgerId = " + lh.getId()
                          + " startEntryId = " + startEntryId
-                         + " endEntryId = " + endEntryId 
+                         + " endEntryId = " + endEntryId
                          + " expectedSuccess = " + expectedSuccess
                          + " gotSuccess = " + numSuccess);
                 return false;
@@ -499,19 +495,51 @@ public class BookieRecoveryTest extends 
         return true;
     }
 
+    // Object used for synchronizing async method calls
+    class SyncLedgerMetaObject {
+        boolean value;
+        int rc;
+        LedgerMetadata meta;
+
+        public SyncLedgerMetaObject() {
+            value = false;
+            meta = null;
+        }
+    }
+
+    private LedgerMetadata getLedgerMetadata(LedgerHandle lh) throws Exception {
+        final SyncLedgerMetaObject syncObj = new SyncLedgerMetaObject();
+        bkc.getLedgerManager().readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+            @Override
+            public void operationComplete(int rc, LedgerMetadata result) {
+                synchronized (syncObj) {
+                    syncObj.rc = rc;
+                    syncObj.meta = result;
+                    syncObj.value = true;
+                    syncObj.notify();
+                }
+            }
+
+        });
+
+        synchronized (syncObj) {
+            while (syncObj.value == false) {
+                syncObj.wait();
+            }
+        }
+        assertEquals(BKException.Code.OK, syncObj.rc);
+        return syncObj.meta;
+    }
+
     private boolean findDupesInEnsembles(List<LedgerHandle> lhs) throws Exception {
         long numDupes = 0;
         for (LedgerHandle lh : lhs) {
-            String znodepath = bkc.getLedgerManager().getLedgerPath(lh.getId());
-            Stat stat = bkc.getZkHandle().exists(znodepath, false);
-            assertNotNull(stat);
-            byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat); 
-            LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion()); 
-            
+            LedgerMetadata md = getLedgerMetadata(lh);
             for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : md.getEnsembles().entrySet()) {
                 HashSet<InetSocketAddress> set = new HashSet<InetSocketAddress>();
                 long fragment = e.getKey();
-                
+
                 for (InetSocketAddress addr : e.getValue()) {
                     if (set.contains(addr)) {
                         LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment
@@ -696,25 +724,25 @@ public class BookieRecoveryTest extends 
             // Create the ledgers
             int numLedgers = 3;
             List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
-            
+
             // Write the entries for the ledgers with dummy values.
             int numMsgs = 100;
             writeEntriestoLedgers(numMsgs, 0, lhs);
-            
+
             // Shutdown the first bookie server
             LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
             int removeIndex = r.nextInt(bs.size());
             InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
             bs.get(removeIndex).shutdown();
             bs.remove(removeIndex);
-            
+
             // Startup three new bookie servers
             startNewBookie();
-            
+
             // Write some more entries for the ledgers so a new ensemble will be
             // created for them.
             writeEntriestoLedgers(numMsgs, numMsgs, lhs);
-            
+
             // Call the async recover bookie method.
             LOG.info("Now recover the data on the killed bookie (" + bookieSrc
                      + ") and replicate it to a random available one");

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestFencing.java Tue Jun 19 10:39:37 2012
@@ -124,7 +124,7 @@ public class TestFencing extends BaseTes
                         lastConfirmedEntry = lh.getLastAddConfirmed();
                         lh.close();
                         break;
-                    } catch (BKException.ZKException zke) {
+                    } catch (BKException.BKMetadataVersionException zke) {
                         LOG.info("Contention with someone else recovering");
                     } catch (BKException.BKLedgerRecoveryException bkre) {
                         LOG.info("Contention with someone else recovering");
@@ -295,7 +295,7 @@ public class TestFencing extends BaseTes
         try {
             writelh.close();
             fail("Should fail trying to update metadata");
-        } catch (BKException.ZKException e) {
+        } catch (BKException.BKMetadataVersionException e) {
             // correct behaviour
         }
     }
@@ -342,7 +342,7 @@ public class TestFencing extends BaseTes
         try {
             writelh.close();
             fail("Should fail trying to update metadata");
-        } catch (BKException.ZKException e) {
+        } catch (BKException.BKMetadataVersionException e) {
             // correct behaviour
         }
     }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java Tue Jun 19 10:39:37 2012
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.meta.LedgerManager.GarbageCollector;
+import org.apache.bookkeeper.meta.ActiveLedgerManager.GarbageCollector;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +46,8 @@ import org.junit.Test;
 public class GcLedgersTest extends LedgerManagerTestCase {
     static final Logger LOG = LoggerFactory.getLogger(GcLedgersTest.class);
 
-    public GcLedgersTest(String ledgerManagerType) {
-        super(ledgerManagerType);
+    public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
+        super(lmFactoryCls);
     }
 
     /**
@@ -56,16 +56,12 @@ public class GcLedgersTest extends Ledge
     private void createLedgers(int numLedgers, final Set<Long> createdLedgers) {
         final AtomicInteger expected = new AtomicInteger(numLedgers);
         for (int i=0; i<numLedgers; i++) {
-            ledgerManager.newLedgerPath(new GenericCallback<String>() {
+            getLedgerManager().createLedger(new LedgerMetadata(1, 1), new GenericCallback<Long>() {
                 @Override
-                public void operationComplete(int rc, String ledgerPath) {
+                public void operationComplete(int rc, Long ledgerId) {
                     if (rc == BKException.Code.OK) {
-                        try {
-                            long ledgerId = ledgerManager.getLedgerId(ledgerPath);
-                            ledgerManager.addActiveLedger(ledgerId, true);
-                            createdLedgers.add(ledgerId);
-                        } catch (IOException ie) {
-                        }
+                        getActiveLedgerManager().addActiveLedger(ledgerId, true);
+                        createdLedgers.add(ledgerId);
                     }
                     synchronized (expected) {
                         int num = expected.decrementAndGet();
@@ -74,7 +70,7 @@ public class GcLedgersTest extends Ledge
                         }
                     }
                 }
-            }, new LedgerMetadata(1, 1));
+            });
         }
         synchronized (expected) {
             try {
@@ -104,7 +100,17 @@ public class GcLedgersTest extends Ledge
         // random remove several ledgers
         for (int i=0; i<numRemovedLedgers; i++) {
             long ledgerId = tmpList.get(i);
-            zkc.delete(ledgerManager.getLedgerPath(ledgerId), -1);
+            getLedgerManager().deleteLedger(ledgerId, new GenericCallback<Void>() {
+                @Override
+                public void operationComplete(int rc, Void result) {
+                    synchronized (removedLedgers) {
+                        removedLedgers.notify();
+                    }
+                }
+            });
+            synchronized (removedLedgers) {
+                removedLedgers.wait();
+            }
             removedLedgers.add(ledgerId);
             createdLedgers.remove(ledgerId);
         }
@@ -115,7 +121,7 @@ public class GcLedgersTest extends Ledge
         Thread gcThread = new Thread() {
             @Override
             public void run() {
-                ledgerManager.garbageCollectLedgers(new GarbageCollector() {
+                getActiveLedgerManager().garbageCollectLedgers(new GarbageCollector() {
                     boolean paused = false;
                     @Override
                     public void gc(long ledgerId) {
@@ -158,10 +164,10 @@ public class GcLedgersTest extends Ledge
 
         // test ledgers
         for (Long ledger : removedLedgers) {
-            assertFalse(ledgerManager.containsActiveLedger(ledger));
+            assertFalse(getActiveLedgerManager().containsActiveLedger(ledger));
         }
         for (Long ledger : createdLedgers) {
-            assertTrue(ledgerManager.containsActiveLedger(ledger));
+            assertTrue(getActiveLedgerManager().containsActiveLedger(ledger));
         }
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerLayoutTest.java Tue Jun 19 10:39:37 2012
@@ -45,7 +45,7 @@ public class LedgerLayoutTest extends Bo
     @Test
     public void testLedgerLayout() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
-        conf.setLedgerManagerType(HierarchicalLedgerManager.NAME);
+        conf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
         String ledgerRootPath = "/testLedgerLayout";
 
         zkc.create(ledgerRootPath, new byte[0], 
@@ -83,8 +83,10 @@ public class LedgerLayoutTest extends Bo
     public void testBadVersionLedgerLayout() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         // write bad version ledger layout
-        writeLedgerLayout(conf.getZkLedgersRootPath(), FlatLedgerManager.NAME,
-                FlatLedgerManager.CUR_VERSION, LedgerLayout.LAYOUT_FORMAT_VERSION + 1);
+        writeLedgerLayout(conf.getZkLedgersRootPath(),
+                          FlatLedgerManagerFactory.class.getName(),
+                          FlatLedgerManagerFactory.CUR_VERSION,
+                          LedgerLayout.LAYOUT_FORMAT_VERSION + 1);
         
         try {
             LedgerLayout.readLayout(zkc, conf.getZkLedgersRootPath());
@@ -120,7 +122,7 @@ public class LedgerLayoutTest extends Bo
         // write bad format ledger layout
         StringBuilder sb = new StringBuilder();
         sb.append(LedgerLayout.LAYOUT_FORMAT_VERSION).append("\n")
-          .append(FlatLedgerManager.NAME);
+          .append(FlatLedgerManagerFactory.class.getName());
         zkc.create(ledgersLayout, sb.toString().getBytes(),
                                  Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
@@ -131,4 +133,19 @@ public class LedgerLayoutTest extends Bo
             assertTrue("Invalid exception", ie.getMessage().contains("Invalid Ledger Manager"));
         }
     }
+
+    @Test
+    public void testReadV1LedgerManagerLayout() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        // write v1 ledger layout
+        writeLedgerLayout(conf.getZkLedgersRootPath(),
+                          FlatLedgerManagerFactory.NAME,
+                          FlatLedgerManagerFactory.CUR_VERSION, 1);
+
+        LedgerLayout layout = LedgerLayout.readLayout(zkc, conf.getZkLedgersRootPath());
+        assertNotNull("Should not be null", layout);
+        assertEquals(FlatLedgerManagerFactory.NAME, layout.getManagerType());
+        assertEquals(FlatLedgerManagerFactory.CUR_VERSION, layout.getManagerVersion());
+        assertEquals(1, layout.getLayoutFormatVersion());
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java Tue Jun 19 10:39:37 2012
@@ -45,18 +45,34 @@ import junit.framework.TestCase;
 public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
     static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class);
 
-    LedgerManager ledgerManager;
+    LedgerManagerFactory ledgerManagerFactory;
+    LedgerManager ledgerManager = null;
+    ActiveLedgerManager activeLedgerManager = null;
 
-    public LedgerManagerTestCase(String ledgerManagerType) {
+    public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
         super(0);
-        baseConf.setLedgerManagerType(ledgerManagerType);
+        baseConf.setLedgerManagerFactoryClass(lmFactoryCls);
+    }
+
+    public LedgerManager getLedgerManager() {
+        if (null == ledgerManager) {
+            ledgerManager = ledgerManagerFactory.newLedgerManager();
+        }
+        return ledgerManager;
+    }
+
+    public ActiveLedgerManager getActiveLedgerManager() {
+        if (null == activeLedgerManager) {
+            activeLedgerManager = ledgerManagerFactory.newActiveLedgerManager();
+        }
+        return activeLedgerManager;
     }
 
     @Parameters
     public static Collection<Object[]> configs() {
         return Arrays.asList(new Object[][] {
-            { FlatLedgerManager.NAME },
-            { HierarchicalLedgerManager.NAME }
+            { FlatLedgerManagerFactory.class },
+            { HierarchicalLedgerManagerFactory.class }
         });
     }
 
@@ -64,13 +80,19 @@ public abstract class LedgerManagerTestC
     @Override
     public void setUp() throws Exception {
         super.setUp();
-        ledgerManager = LedgerManagerFactory.newLedgerManager(baseConf, zkc);
+        ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(baseConf, zkc);
     }
 
     @After
     @Override
     public void tearDown() throws Exception {
-        ledgerManager.close();
+        if (null != ledgerManager) {
+            ledgerManager.close();
+        }
+        if (null != activeLedgerManager) {
+            activeLedgerManager.close();
+        }
+        ledgerManagerFactory.uninitialize();
         super.tearDown();
     }
 

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerManager.java Tue Jun 19 10:39:37 2012
@@ -29,9 +29,9 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.CountDownLatch;
-
 import java.util.List;
 import java.util.ArrayList;
+import java.lang.reflect.Field;
 
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.After;
@@ -49,6 +49,19 @@ public class TestLedgerManager extends B
         super(0);
     }
 
+    private void writeLedgerLayout(String ledgersRootPath,
+                                   String managerType,
+                                   int managerVersion, int layoutVersion)
+        throws Exception {
+        LedgerLayout layout = new LedgerLayout(managerType, managerVersion);
+
+        Field f = LedgerLayout.class.getDeclaredField("layoutFormatVersion");
+        f.setAccessible(true);
+        f.set(layout, layoutVersion);
+
+        layout.store(zkc, ledgersRootPath);
+    }
+
     /** 
      * Test bad client configuration
      */
@@ -62,14 +75,15 @@ public class TestLedgerManager extends B
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         conf.setZkLedgersRootPath(root0);
 
-        LedgerManager m = LedgerManagerFactory.newLedgerManager(conf, zkc);
+        LedgerManagerFactory m = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
         assertTrue("Ledger manager is unexpected type", 
-                   (m instanceof FlatLedgerManager));
+                   (m instanceof FlatLedgerManagerFactory));
+        m.uninitialize();
 
         // mismatching conf
-        conf.setLedgerManagerType(HierarchicalLedgerManager.NAME);
+        conf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
         try {
-            LedgerManagerFactory.newLedgerManager(conf, zkc);
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
             fail("Shouldn't reach here");
         } catch (Exception e) {
             assertTrue("Invalid exception", 
@@ -82,13 +96,53 @@ public class TestLedgerManager extends B
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         conf.setZkLedgersRootPath(root1);
 
-        conf.setLedgerManagerType("DoesNotExist");
+        conf.setLedgerManagerFactoryClassName("DoesNotExist");
+        try {
+            LedgerManagerFactory f = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
+            fail("Shouldn't reach here");
+        } catch (Exception e) {
+            assertTrue("Invalid exception",
+                    e.getMessage().contains("Failed to get ledger manager factory class from configuration"));
+        }
+    }
+
+    /**
+     * Test bad client configuration
+     */
+    @Test
+    public void testBadConfV1() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+
+        String root0 = "/goodconf0";
+        zkc.create(root0, new byte[0],
+                   Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        conf.setZkLedgersRootPath(root0);
+        // write v1 layout
+        writeLedgerLayout(root0, FlatLedgerManagerFactory.NAME,
+                          FlatLedgerManagerFactory.CUR_VERSION, 1);
+
+        conf.setLedgerManagerFactoryClass(FlatLedgerManagerFactory.class);
+
+        LedgerManagerFactory m = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
+        assertTrue("Ledger manager is unexpected type",
+                   (m instanceof FlatLedgerManagerFactory));
+        m.uninitialize();
+
+        // v2 setting doesn't effect v1
+        conf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class);
+        m = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
+        assertTrue("Ledger manager is unexpected type",
+                   (m instanceof FlatLedgerManagerFactory));
+        m.uninitialize();
+
+        // mismatching conf
+        conf.setLedgerManagerType(HierarchicalLedgerManagerFactory.NAME);
         try {
-            LedgerManagerFactory.newLedgerManager(conf, zkc);
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
             fail("Shouldn't reach here");
         } catch (Exception e) {
             assertTrue("Invalid exception", 
-                    e.getMessage().contains("Unknown ledger manager type "));
+                       e.getMessage().contains("does not match existing layout"));
         }
     }
 
@@ -109,11 +163,11 @@ public class TestLedgerManager extends B
                          0xdeadbeef).store(zkc, root0);
         
         try {
-            LedgerManagerFactory.newLedgerManager(conf, zkc);
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
             fail("Shouldn't reach here");
         } catch (Exception e) {
             assertTrue("Invalid exception", 
-                    e.getMessage().contains("Unknown ledger manager type"));
+                    e.getMessage().contains("Failed to instantiate ledger manager factory"));
         }
 
         // bad version in zookeeper
@@ -122,13 +176,14 @@ public class TestLedgerManager extends B
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         conf.setZkLedgersRootPath(root1);
         
-        new LedgerLayout(FlatLedgerManager.NAME,
+        new LedgerLayout(FlatLedgerManagerFactory.class.getName(),
                          0xdeadbeef).store(zkc, root1);
         
         try {
-            LedgerManagerFactory.newLedgerManager(conf, zkc);
+            LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
             fail("Shouldn't reach here");
         } catch (Exception e) {
+            System.out.println(e.getMessage());
             assertTrue("Invalid exception", 
                     e.getMessage().contains("Incompatible layout version found"));
         }
@@ -136,13 +191,13 @@ public class TestLedgerManager extends B
 
     private static class CreateLMThread extends Thread {
         private boolean success = false;
-        private final String type;
+        private final String factoryCls;
         private final String root;
         private final CyclicBarrier barrier;
         private ZooKeeper zkc;
         
-        CreateLMThread(String root, String type, CyclicBarrier barrier) throws Exception {
-            this.type = type;
+        CreateLMThread(String root, String factoryCls, CyclicBarrier barrier) throws Exception {
+            this.factoryCls = factoryCls;
             this.barrier = barrier;
             this.root = root;
             final CountDownLatch latch = new CountDownLatch(1);
@@ -156,11 +211,13 @@ public class TestLedgerManager extends B
 
         public void run() {
             ClientConfiguration conf = new ClientConfiguration();
-            conf.setLedgerManagerType(type);
+            conf.setLedgerManagerFactoryClassName(factoryCls);
 
             try {
                 barrier.await();
-                LedgerManagerFactory.newLedgerManager(conf, zkc);
+                LedgerManagerFactory factory =
+                    LedgerManagerFactory.newLedgerManagerFactory(conf, zkc);
+                factory.uninitialize();
                 
                 success = true;
             } catch (Exception e) {
@@ -191,7 +248,7 @@ public class TestLedgerManager extends B
         CyclicBarrier barrier = new CyclicBarrier(numThreads+1);
         List<CreateLMThread> threads = new ArrayList<CreateLMThread>(numThreads);
         for (int i = 0; i < numThreads; i++) {
-            CreateLMThread t = new CreateLMThread(root0, FlatLedgerManager.NAME, barrier);
+            CreateLMThread t = new CreateLMThread(root0, FlatLedgerManagerFactory.class.getName(), barrier);
             t.start();
             threads.add(t);
         }
@@ -220,14 +277,14 @@ public class TestLedgerManager extends B
         CyclicBarrier barrier = new CyclicBarrier(numThreadsEach*2+1);
         List<CreateLMThread> threadsA = new ArrayList<CreateLMThread>(numThreadsEach);
         for (int i = 0; i < numThreadsEach; i++) {
-            CreateLMThread t = new CreateLMThread(root0, FlatLedgerManager.NAME, barrier);
+            CreateLMThread t = new CreateLMThread(root0, FlatLedgerManagerFactory.class.getName(), barrier);
             t.start();
             threadsA.add(t);
         }
         List<CreateLMThread> threadsB = new ArrayList<CreateLMThread>(numThreadsEach);
         for (int i = 0; i < numThreadsEach; i++) {
             CreateLMThread t = new CreateLMThread(root0, 
-                    HierarchicalLedgerManager.NAME, barrier);
+                    HierarchicalLedgerManagerFactory.class.getName(), barrier);
             t.start();
             threadsB.add(t);
         }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java?rev=1351646&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestZkVersion.java Tue Jun 19 10:39:37 2012
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.meta;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Version.Occurred;
+
+public class TestZkVersion {
+
+    @Test
+    public void testNullZkVersion() {
+        ZkVersion zkVersion = new ZkVersion(99);
+        Assert.assertEquals(Version.Occurred.AFTER, zkVersion.compare(null));
+    }
+
+    @Test
+    public void testInvalidVersion() {
+        ZkVersion zkVersion = new ZkVersion(99);
+        try {
+            zkVersion.compare(new Version() {
+                @Override
+                public Occurred compare(Version v) {
+                    return Occurred.AFTER;
+                }
+            });
+            Assert.fail("Should not reach here!");
+        } catch (IllegalArgumentException iae) {
+        }
+    }
+
+    @Test
+    public void testCompare() {
+        ZkVersion zv = new ZkVersion(99);
+        Assert.assertEquals(Occurred.AFTER, zv.compare(new ZkVersion(98)));
+        Assert.assertEquals(Occurred.BEFORE, zv.compare(new ZkVersion(100)));
+        Assert.assertEquals(Occurred.CONCURRENTLY, zv.compare(new ZkVersion(99)));
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/AsyncLedgerOpsTest.java Tue Jun 19 10:39:37 2012
@@ -24,8 +24,6 @@ package org.apache.bookkeeper.test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
@@ -43,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.runners.Parameterized.Parameters;
 
 /**
  * This test tests read and write, synchronous and asynchronous, strings and
@@ -51,23 +48,21 @@ import org.junit.runners.Parameterized.P
  * and three BookKeepers.
  *
  */
-public class AsyncLedgerOpsTest extends BaseTestCase implements AddCallback, ReadCallback, CreateCallback,
+public class AsyncLedgerOpsTest extends MultiLedgerManagerMultiDigestTestCase
+    implements AddCallback, ReadCallback, CreateCallback,
     CloseCallback, OpenCallback {
-    static Logger LOG = LoggerFactory.getLogger(BookieClientTest.class);
+    static Logger LOG = LoggerFactory.getLogger(AsyncLedgerOpsTest.class);
 
     DigestType digestType;
 
-    public AsyncLedgerOpsTest(DigestType digestType) {
+    public AsyncLedgerOpsTest(String ledgerManagerFactory, DigestType digestType) {
         super(3);
         this.digestType = digestType;
+        // set ledger manager type
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
     }
 
-    @Parameters
-    public static Collection<Object[]> configs() {
-        return Arrays.asList(new Object[][] { {DigestType.MAC }, {DigestType.CRC32}});
-    }
-
-
     byte[] ledgerPassword = "aaa".getBytes();
     LedgerHandle lh, lh2;
     long ledgerId;
@@ -199,6 +194,7 @@ public class AsyncLedgerOpsTest extends 
 
     }
 
+    @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         SyncObj x = (SyncObj) ctx;
         synchronized (x) {
@@ -207,6 +203,7 @@ public class AsyncLedgerOpsTest extends 
         }
     }
 
+    @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
         ls = seq;
         synchronized (sync) {
@@ -216,6 +213,7 @@ public class AsyncLedgerOpsTest extends 
 
     }
 
+    @Override
     public void createComplete(int rc, LedgerHandle lh, Object ctx) {
         synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;
@@ -224,6 +222,7 @@ public class AsyncLedgerOpsTest extends 
         }
     }
 
+    @Override
     public void openComplete(int rc, LedgerHandle lh, Object ctx) {
         synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;
@@ -232,6 +231,7 @@ public class AsyncLedgerOpsTest extends 
         }
     }
 
+    @Override
     public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
         synchronized (ctx) {
             ControlObj cobj = (ControlObj) ctx;

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Tue Jun 19 10:39:37 2012
@@ -230,6 +230,7 @@ public abstract class BookKeeperClusterT
         for (final Thread t : allthreads) {
             if (t.getName().equals(name)) {
                 Thread sleeper = new Thread() {
+                    @Override
                     public void run() {
                         try {
                             t.suspend();

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java Tue Jun 19 10:39:37 2012
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.test;
  *
  */
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,7 +28,6 @@ import java.util.Enumeration;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperTestClient;
@@ -41,8 +39,6 @@ import org.apache.bookkeeper.proto.Booki
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,7 +49,8 @@ import org.junit.Test;
  *
  */
 
-public class BookieFailureTest extends BaseTestCase implements AddCallback, ReadCallback {
+public class BookieFailureTest extends MultiLedgerManagerMultiDigestTestCase
+    implements AddCallback, ReadCallback {
 
     // Depending on the taste, select the amount of logging
     // by decommenting one of the two lines below
@@ -87,9 +84,12 @@ public class BookieFailureTest extends B
         }
     }
 
-    public BookieFailureTest(DigestType digestType) {
+    public BookieFailureTest(String ledgerManagerFactory, DigestType digestType) {
         super(4);
         this.digestType = digestType;
+        // set ledger manager
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
     }
 
     /**
@@ -244,6 +244,7 @@ public class BookieFailureTest extends B
 
     }
 
+    @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         if (rc != 0)
             fail("Failed to write entry: " + entryId);
@@ -254,6 +255,7 @@ public class BookieFailureTest extends B
         }
     }
 
+    @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
         if (rc != 0)
             fail("Failed to write entry");

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java?rev=1351646&r1=1351645&r2=1351646&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieReadWriteTest.java Tue Jun 19 10:39:37 2012
@@ -48,7 +48,6 @@ import org.apache.bookkeeper.streaming.L
 import org.apache.bookkeeper.streaming.LedgerOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.junit.Assert;
@@ -62,7 +61,7 @@ import org.junit.Test;
  *
  */
 
-public class BookieReadWriteTest extends BaseTestCase
+public class BookieReadWriteTest extends MultiLedgerManagerMultiDigestTestCase
     implements AddCallback, ReadCallback, ReadLastConfirmedCallback {
 
     // Depending on the taste, select the amount of logging
@@ -84,9 +83,12 @@ public class BookieReadWriteTest extends
 
     DigestType digestType;
 
-    public BookieReadWriteTest(DigestType digestType) {
+    public BookieReadWriteTest(String ledgerManagerFactory, DigestType digestType) {
         super(3);
         this.digestType = digestType;
+        // set ledger manager
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
+        baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
     }
     // Synchronization
     SyncObj sync;
@@ -105,7 +107,7 @@ public class BookieReadWriteTest extends
     }
 
     @Test
-    public void testOpenException() throws KeeperException, IOException, InterruptedException {
+    public void testOpenException() throws IOException, InterruptedException {
         try {
             lh = bkc.openLedger(0, digestType, ledgerPassword);
             fail("Haven't thrown exception");
@@ -117,10 +119,10 @@ public class BookieReadWriteTest extends
     /**
      * test the streaming api for reading and writing
      *
-     * @throws {@link IOException}, {@link KeeperException}
+     * @throws {@link IOException}
      */
     @Test
-    public void testStreamingClients() throws IOException, KeeperException, BKException, InterruptedException {
+    public void testStreamingClients() throws IOException, BKException, InterruptedException {
         lh = bkc.createLedger(digestType, ledgerPassword);
         // write a string so that we cna
         // create a buffer of a single bytes
@@ -205,7 +207,7 @@ public class BookieReadWriteTest extends
             assertTrue("Verifying number of entries written", lh.getLastAddConfirmed() == (numEntriesToWrite - 1));
 
             // read entries
-            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, (Object) sync);
+            lh.asyncReadEntries(0, numEntriesToWrite - 1, this, sync);
 
             synchronized (sync) {
                 while (sync.value == false) {
@@ -319,7 +321,7 @@ public class BookieReadWriteTest extends
                        lh.getLastAddConfirmed() == (numEntries - 1));
 
             // read entries
-            lh.asyncReadEntries(0, numEntries - 1, this, (Object) sync);
+            lh.asyncReadEntries(0, numEntries - 1, this, sync);
 
             synchronized (sync) {
                 while (sync.value == false) {
@@ -378,6 +380,7 @@ public class BookieReadWriteTest extends
             this.throttle = threshold;
         }
 
+        @Override
         public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
             if(rc != BKException.Code.OK) {
                 fail("Return code is not OK: " + rc);
@@ -478,7 +481,7 @@ public class BookieReadWriteTest extends
             // read entries
             sync.counter = 0;
             for (int i = 0; i < numEntriesToWrite; i+=throttle) {
-                lh.asyncReadEntries(i, i + throttle - 1, tcb, (Object) sync);
+                lh.asyncReadEntries(i, i + throttle - 1, tcb, sync);
                 int testValue = getAvailablePermits(lh);
                 assertTrue("Difference is incorrect : " + i + ", " + sync.counter + ", " + testValue, testValue <= throttle);
             }
@@ -814,7 +817,7 @@ public class BookieReadWriteTest extends
             fail("Test failed due to interruption");
         }
     }
-    
+
     @Test
     public void testReadFromOpenLedger() throws IOException {
         try {
@@ -904,7 +907,7 @@ public class BookieReadWriteTest extends
             fail("Test failed due to interruption");
         }
     }
-    
+
     @Test
     public void testReadFromOpenLedgerOpenOnce() throws Exception {
         try {
@@ -918,7 +921,7 @@ public class BookieReadWriteTest extends
                 ByteBuffer entry = ByteBuffer.allocate(4);
                 entry.putInt(rng.nextInt(maxInt));
                 entry.position(0);
-                
+
                 entries.add(entry.array());
                 entriesSize.add(entry.array().length);
                 lh.addEntry(entry.array());
@@ -927,7 +930,7 @@ public class BookieReadWriteTest extends
                     // less than written
                     // and it just can read until (i-1)
                     int toRead = i - 1;
-                    
+
                     long readLastConfirmed = lhOpen.readLastConfirmed();
                     assertTrue(readLastConfirmed != 0);
                     Enumeration<LedgerEntry> readEntry = lhOpen.readEntries(toRead, toRead);
@@ -945,12 +948,12 @@ public class BookieReadWriteTest extends
                         LOG.error("Unexpected exception", ex);
                         fail("Unexpected exception");
                     }
-                    
+
                 }
             }
             long last = lh.readLastConfirmed();
             assertTrue("Last confirmed add: " + last, last == (numEntriesToWrite - 2));
-    
+
             LOG.debug("*** WRITE COMPLETE ***");
             // close ledger
             lh.close();
@@ -962,9 +965,9 @@ public class BookieReadWriteTest extends
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
+
     @Test
     public void testReadFromOpenLedgerZeroAndOne() throws Exception {
         try {
@@ -974,7 +977,7 @@ public class BookieReadWriteTest extends
             ledgerId = lh.getId();
             LOG.info("Ledger ID: " + lh.getId());
             LedgerHandle lhOpen = bkc.openLedgerNoRecovery(ledgerId, digestType, ledgerPassword);
-            
+
             /*
              * We haven't written anything, so it should be empty.
              */
@@ -982,7 +985,7 @@ public class BookieReadWriteTest extends
             long readLastConfirmed = lhOpen.readLastConfirmed();
             assertTrue("Last confirmed has the wrong value",
                        readLastConfirmed == LedgerHandle.INVALID_ENTRY_ID);
-            
+
             /*
              * Writing one entry.
              */
@@ -990,7 +993,7 @@ public class BookieReadWriteTest extends
             ByteBuffer entry = ByteBuffer.allocate(4);
             entry.putInt(rng.nextInt(maxInt));
             entry.position(0);
-            
+
             entries.add(entry.array());
             entriesSize.add(entry.array().length);
             lh.addEntry(entry.array());
@@ -1004,13 +1007,13 @@ public class BookieReadWriteTest extends
             assertTrue(readLastConfirmed == LedgerHandle.INVALID_ENTRY_ID);
 
             /*
-             * Adding one more, and this time we should expect to 
+             * Adding one more, and this time we should expect to
              * see one entry.
              */
             entry = ByteBuffer.allocate(4);
             entry.putInt(rng.nextInt(maxInt));
             entry.position(0);
-            
+
             entries.add(entry.array());
             entriesSize.add(entry.array().length);
             lh.addEntry(entry.array());
@@ -1018,7 +1021,7 @@ public class BookieReadWriteTest extends
             LOG.info("Checking that it has an entry");
             readLastConfirmed = lhOpen.readLastConfirmed();
             assertTrue(readLastConfirmed == 0L);
-                        
+
             // close ledger
             lh.close();
             // close read only ledger should not change metadata
@@ -1029,9 +1032,9 @@ public class BookieReadWriteTest extends
         } catch (InterruptedException e) {
             LOG.error("Test failed", e);
             fail("Test failed due to interruption");
-        } 
+        }
     }
-    
+
 
     @Test
     public void testLastConfirmedAdd() throws IOException {
@@ -1101,6 +1104,7 @@ public class BookieReadWriteTest extends
     }
 
 
+    @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
         if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
 
@@ -1112,6 +1116,7 @@ public class BookieReadWriteTest extends
         }
     }
 
+    @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
         if(rc != BKException.Code.OK) fail("Return code is not OK: " + rc);
 
@@ -1123,6 +1128,7 @@ public class BookieReadWriteTest extends
         }
     }
 
+    @Override
     public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
         SyncObj sync = (SyncObj) ctx;
 
@@ -1132,6 +1138,7 @@ public class BookieReadWriteTest extends
         }
     }
 
+    @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
@@ -1160,6 +1167,7 @@ public class BookieReadWriteTest extends
 
     /* User for testing purposes, void */
     class emptyWatcher implements Watcher {
+        @Override
         public void process(WatchedEvent event) {
         }
     }