You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2010/07/09 23:18:00 UTC

svn commit: r962697 - in /hadoop/zookeeper/trunk: ./ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/ src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/ src/contrib/bookkeeper/test/org/apache/bookkeeper/test/

Author: breed
Date: Fri Jul  9 21:17:57 2010
New Revision: 962697

URL: http://svn.apache.org/viewvc?rev=962697&view=rev
Log:
ZOOKEEPER-712. Bookie recovery.

Added:
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
    hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul  9 21:17:57 2010
@@ -99,6 +99,8 @@ NEW FEATURES:
 
   ZOOKEEPER-744. Add monitoring four-letter word (Savu Andrei via phunt)
 
+  ZOOKEEPER-712. Bookie recovery. (erwin tam via breed)
+
 Release 3.3.0 - 2010-03-24
 
 Non-backward compatible changes:

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Fri Jul  9 21:17:57 2010
@@ -111,4 +111,16 @@ public interface AsyncCallback {
       void deleteComplete(int rc, Object ctx);
     }
 
+  public interface RecoverCallback {
+      /**
+       * Callback definition for bookie recover operations
+       * 
+       * @param rc
+       *          return code
+       * @param ctx
+       *          control object
+       */
+      void recoverComplete(int rc, Object ctx);
+    }
+
 }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Fri Jul  9 21:17:57 2010
@@ -159,6 +159,15 @@ public class BookKeeper implements OpenC
   }
 
   /**
+   * Get the BookieClient, currently used for doing bookie recovery.
+   * 
+   * @return BookieClient for the BookKeeper instance.
+   */
+  public BookieClient getBookieClient() {
+      return bookieClient;
+  }
+  
+  /**
    * Creates a new ledger asynchronously. To create a ledger, we need to specify
    * the ensemble size, the quorum size, the digest type, a password, a callback
    * implementation, and an optional control object. The ensemble size is how

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java Fri Jul  9 21:17:57 2010
@@ -35,7 +35,7 @@ import org.jboss.netty.buffer.ChannelBuf
  * for the packet. Currently 2 types of digests are supported: MAC (based on SHA-1) and CRC32
  */
 
-abstract class DigestManager {
+public abstract class DigestManager {
     static final Logger logger = Logger.getLogger(DigestManager.class);
 
     long ledgerId;
@@ -67,7 +67,7 @@ abstract class DigestManager {
         }
     }
 
-    ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
+    public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
 
         byte[] bufferArray = new byte[24+macCodeLength];
         ByteBuffer buffer = ByteBuffer.wrap(bufferArray);

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java Fri Jul  9 21:17:57 2010
@@ -28,7 +28,7 @@ package org.apache.bookkeeper.client;
  * to.
  */
 
-interface DistributionSchedule {
+public interface DistributionSchedule {
 
     /**
      * 

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jul  9 21:17:57 2010
@@ -121,7 +121,43 @@ public class LedgerHandle implements Rea
     return lastAddPushed;
   }
 
-  void writeLedgerConfig(StatCallback callback, Object ctx) {
+  /**
+   * Get the Ledger's key/password.
+   * 
+   * @return byte array for the ledger's key/password.
+   */
+  public byte[] getLedgerKey() {
+      return ledgerKey;
+  }
+  
+  /**
+   * Get the LedgerMetadata
+   * 
+   * @return LedgerMetadata for the LedgerHandle
+   */
+  public LedgerMetadata getLedgerMetadata() {
+      return metadata;
+  }
+  
+  /**
+   * Get the DigestManager
+   * 
+   * @return DigestManager for the LedgerHandle
+   */
+  public DigestManager getDigestManager() {
+      return macManager;
+  }
+  
+  /**
+   * Get the Distribution Schedule
+   * 
+   * @return DistributionSchedule for the LedgerHandle
+   */
+  public DistributionSchedule getDistributionSchedule() {
+      return distributionSchedule;
+  }
+  
+  public void writeLedgerConfig(StatCallback callback, Object ctx) {
     bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
         metadata.serialize(), -1, callback, ctx);
   }

Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jul  9 21:17:57 2010
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
  * in zookeeper. It provides parsing and serialization methods of such metadata.
  * 
  */
-class LedgerMetadata {
+public class LedgerMetadata {
     static final Logger LOG = Logger.getLogger(LedgerMetadata.class);
 
     private static final String closed = "CLOSED";
@@ -59,6 +59,17 @@ class LedgerMetadata {
         this(0, 0);
     }
 
+    /**
+     * Get the Map of bookie ensembles for the various ledger fragments 
+     * that make up the ledger.
+     * 
+     * @return SortedMap of Ledger Fragments and the corresponding 
+     * bookie ensembles that store the entries.
+     */
+    public SortedMap<Long, ArrayList<InetSocketAddress>> getEnsembles() {
+        return ensembles;
+    }
+    
     boolean isClosed() {
         return close != NOTCLOSED;
     }

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=962697&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java Fri Jul  9 21:17:57 2010
@@ -0,0 +1,762 @@
+package org.apache.bookkeeper.tools;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Provides Admin Tools to manage the BookKeeper cluster.
+ * 
+ */
+public class BookKeeperTools {
+
+    private static Logger LOG = Logger.getLogger(BookKeeperTools.class);
+
+    // ZK client instance
+    private ZooKeeper zk;
+    // ZK ledgers related String constants
+    static final String LEDGERS_PATH = "/ledgers";
+    static final String LEDGER_NODE_PREFIX = "L";
+    static final String AVAILABLE_NODE = "available";
+    static final String BOOKIES_PATH = LEDGERS_PATH + "/" + AVAILABLE_NODE;
+    static final String COLON = ":";
+
+    // BookKeeper client instance
+    private BookKeeper bkc;
+
+    /*
+     * Random number generator used to choose an available bookie server to
+     * replicate data from a dead bookie.
+     */
+    private Random rand = new Random();
+
+    /*
+     * For now, assume that all ledgers were created with the same DigestType
+     * and password. In the future, this admin tool will need to know for each
+     * ledger, what was the DigestType and password used to create it before it
+     * can open it. These values will come from System properties, though hard
+     * coded defaults are defined here.
+     */
+    private DigestType DIGEST_TYPE = DigestType.valueOf(System.getProperty("digestType", DigestType.CRC32.toString()));
+    private byte[] PASSWD = System.getProperty("passwd", "").getBytes();
+
+    /**
+     * Constructor that takes in a ZooKeeper servers connect string so we know
+     * how to connect to ZooKeeper to retrieve information about the BookKeeper
+     * cluster. We need this before we can do any type of admin operations on
+     * the BookKeeper cluster.
+     * 
+     * @param zkServers
+     *            Comma separated list of hostname:port pairs for the ZooKeeper
+     *            servers cluster.
+     * @throws IOException
+     *             Throws this exception if there is an error instantiating the
+     *             ZooKeeper client.
+     * @throws InterruptedException
+     *             Throws this exception if there is an error instantiating the
+     *             BookKeeper client.
+     * @throws KeeperException
+     *             Throws this exception if there is an error instantiating the
+     *             BookKeeper client.
+     */
+    public BookKeeperTools(String zkServers) throws IOException, InterruptedException, KeeperException {
+        // Create the ZooKeeper client instance
+        zk = new ZooKeeper(zkServers, 10000, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Process: " + event.getType() + " " + event.getPath());
+                }
+            }
+        });
+        // Create the BookKeeper client instance
+        bkc = new BookKeeper(zk);
+    }
+
+    /**
+     * Shutdown method to gracefully release resources that this class uses.
+     * 
+     * @throws InterruptedException
+     *             if there is an error shutting down the clients that this
+     *             class uses.
+     */
+    public void shutdown() throws InterruptedException {
+        bkc.halt();
+        zk.close();
+    }
+
+    /**
+     * This is a multi callback object for bookie recovery that waits for all of
+     * the multiple async operations to complete. If any fail, then we invoke
+     * the final callback with a BK LedgerRecoveryException.
+     */
+    class MultiCallback implements AsyncCallback.VoidCallback {
+        // Number of expected callbacks
+        final int expected;
+        // Final callback and the corresponding context to invoke
+        final AsyncCallback.VoidCallback cb;
+        final Object context;
+        // This keeps track of how many operations have completed
+        final AtomicInteger done = new AtomicInteger();
+        // List of the exceptions from operations that completed unsuccessfully
+        final LinkedBlockingQueue<Integer> exceptions = new LinkedBlockingQueue<Integer>();
+
+        MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) {
+            this.expected = expected;
+            this.cb = cb;
+            this.context = context;
+            if (expected == 0) {
+                cb.processResult(Code.OK.intValue(), null, context);
+            }
+        }
+
+        private void tick() {
+            if (done.incrementAndGet() == expected) {
+                if (exceptions.isEmpty()) {
+                    cb.processResult(Code.OK.intValue(), null, context);
+                } else {
+                    cb.processResult(BKException.Code.LedgerRecoveryException, null, context);
+                }
+            }
+        }
+
+        @Override
+        public void processResult(int rc, String path, Object ctx) {
+            if (rc != Code.OK.intValue()) {
+                LOG.error("BK error recovering ledger data", BKException.create(rc));
+                exceptions.add(rc);
+            }
+            tick();
+        }
+
+    }
+
+    /**
+     * Method to get the input ledger's digest type. For now, this is just a
+     * placeholder function since there is no way we can get this information
+     * easily. In the future, BookKeeper should store this ledger metadata
+     * somewhere such that an admin tool can access it.
+     * 
+     * @param ledgerId
+     *            LedgerId we are retrieving the digestType for.
+     * @return DigestType for the input ledger
+     */
+    private DigestType getLedgerDigestType(long ledgerId) {
+        return DIGEST_TYPE;
+    }
+
+    /**
+     * Method to get the input ledger's password. For now, this is just a
+     * placeholder function since there is no way we can get this information
+     * easily. In the future, BookKeeper should store this ledger metadata
+     * somewhere such that an admin tool can access it.
+     * 
+     * @param ledgerId
+     *            LedgerId we are retrieving the password for.
+     * @return Password for the input ledger
+     */
+    private byte[] getLedgerPasswd(long ledgerId) {
+        return PASSWD;
+    }
+
+    // Object used for calling async methods and waiting for them to complete.
+    class SyncObject {
+        boolean value;
+
+        public SyncObject() {
+            value = false;
+        }
+    }
+
+    /**
+     * Synchronous method to rebuild and recover the ledger fragments data that
+     * was stored on the source bookie. That bookie could have failed completely
+     * and now the ledger data that was stored on it is under replicated. An
+     * optional destination bookie server could be given if we want to copy all
+     * of the ledger fragments data on the failed source bookie to it.
+     * Otherwise, we will just randomly distribute the ledger fragments to the
+     * active set of bookies, perhaps based on load. All ZooKeeper ledger
+     * metadata will be updated to point to the new bookie(s) that contain the
+     * replicated ledger fragments.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param bookieDest
+     *            Optional destination bookie that if passed, we will copy all
+     *            of the ledger fragments from the source bookie over to it.
+     */
+    public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest)
+            throws InterruptedException {
+        SyncObject sync = new SyncObject();
+        // Call the async method to recover bookie data.
+        asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
+            @Override
+            public void recoverComplete(int rc, Object ctx) {
+                LOG.info("Recover bookie operation completed with rc: " + rc);
+                SyncObject syncObj = (SyncObject) ctx;
+                synchronized (syncObj) {
+                    syncObj.value = true;
+                    syncObj.notify();
+                }
+            }
+        }, sync);
+
+        // Wait for the async method to complete.
+        synchronized (sync) {
+            while (sync.value == false) {
+                sync.wait();
+            }
+        }
+    }
+
+    /**
+     * Async method to rebuild and recover the ledger fragments data that was
+     * stored on the source bookie. That bookie could have failed completely and
+     * now the ledger data that was stored on it is under replicated. An
+     * optional destination bookie server could be given if we want to copy all
+     * of the ledger fragments data on the failed source bookie to it.
+     * Otherwise, we will just randomly distribute the ledger fragments to the
+     * active set of bookies, perhaps based on load. All ZooKeeper ledger
+     * metadata will be updated to point to the new bookie(s) that contain the
+     * replicated ledger fragments.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param bookieDest
+     *            Optional destination bookie that if passed, we will copy all
+     *            of the ledger fragments from the source bookie over to it.
+     * @param cb
+     *            RecoverCallback to invoke once all of the data on the dead
+     *            bookie has been recovered and replicated.
+     * @param context
+     *            Context for the RecoverCallback to call.
+     */
+    public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+            final RecoverCallback cb, final Object context) {
+        // Sync ZK to make sure we're reading the latest bookie/ledger data.
+        zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path));
+                    cb.recoverComplete(BKException.Code.ZKException, context);
+                    return;
+                }
+                getAvailableBookies(bookieSrc, bookieDest, cb, context);
+            };
+        }, null);
+    }
+
+    /**
+     * This method asynchronously gets the set of available Bookies that the
+     * dead input bookie's data will be copied over into. If the user passed in
+     * a specific destination bookie, then just use that one. Otherwise, we'll
+     * randomly pick one of the other available bookies to use for each ledger
+     * fragment we are replicating.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param bookieDest
+     *            Optional destination bookie that if passed, we will copy all
+     *            of the ledger fragments from the source bookie over to it.
+     * @param cb
+     *            RecoverCallback to invoke once all of the data on the dead
+     *            bookie has been recovered and replicated.
+     * @param context
+     *            Context for the RecoverCallback to call.
+     */
+    private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+            final RecoverCallback cb, final Object context) {
+        final List<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>();
+        if (bookieDest != null) {
+            availableBookies.add(bookieDest);
+            // Now poll ZK to get the active ledgers
+            getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
+        } else {
+            zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback() {
+                @Override
+                public void processResult(int rc, String path, Object ctx, List<String> children) {
+                    if (rc != Code.OK.intValue()) {
+                        LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
+                                .get(rc), path));
+                        cb.recoverComplete(BKException.Code.ZKException, context);
+                        return;
+                    }
+                    for (String bookieNode : children) {
+                        String parts[] = bookieNode.split(COLON);
+                        if (parts.length < 2) {
+                            LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
+                            cb.recoverComplete(BKException.Code.ZKException, context);
+                            return;
+                        }
+                        availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])));
+                    }
+                    // Now poll ZK to get the active ledgers
+                    getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
+                }
+            }, null);
+        }
+    }
+
+    /**
+     * This method asynchronously polls ZK to get the current set of active
+     * ledgers. From this, we can open each ledger and look at the metadata to
+     * determine if any of the ledger fragments for it were stored at the dead
+     * input bookie.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param bookieDest
+     *            Optional destination bookie that if passed, we will copy all
+     *            of the ledger fragments from the source bookie over to it.
+     * @param cb
+     *            RecoverCallback to invoke once all of the data on the dead
+     *            bookie has been recovered and replicated.
+     * @param context
+     *            Context for the RecoverCallback to call.
+     * @param availableBookies
+     *            List of Bookie Servers that are available to use for
+     *            replicating data on the failed bookie. This could contain a
+     *            single bookie server if the user explicitly chose a bookie
+     *            server to replicate data to.
+     */
+    private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+            final RecoverCallback cb, final Object context, final List<InetSocketAddress> availableBookies) {
+        zk.getChildren(LEDGERS_PATH, null, new AsyncCallback.ChildrenCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx, List<String> children) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("ZK error getting ledger nodes: ", KeeperException.create(KeeperException.Code.get(rc),
+                            path));
+                    cb.recoverComplete(BKException.Code.ZKException, context);
+                    return;
+                }
+                // Wrapper class around the RecoverCallback so it can be used
+                // as the final VoidCallback to invoke within the MultiCallback.
+                class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
+                    final RecoverCallback cb;
+
+                    RecoverCallbackWrapper(RecoverCallback cb) {
+                        this.cb = cb;
+                    }
+
+                    @Override
+                    public void processResult(int rc, String path, Object ctx) {
+                        cb.recoverComplete(rc, ctx);
+                    }
+                }
+                // Recover each of the ledgers asynchronously
+                MultiCallback ledgerMcb = new MultiCallback(children.size(), new RecoverCallbackWrapper(cb), context);
+                for (final String ledgerNode : children) {
+                    recoverLedger(bookieSrc, ledgerNode, ledgerMcb, availableBookies);
+                }
+            }
+        }, null);
+    }
+
+    /**
+     * This method asynchronously recovers a given ledger if any of the ledger
+     * entries were stored on the failed bookie.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param ledgerNode
+     *            Ledger Node name as retrieved from ZooKeeper we want to
+     *            recover.
+     * @param ledgerMcb
+     *            MultiCallback to invoke once we've recovered the current
+     *            ledger.
+     * @param availableBookies
+     *            List of Bookie Servers that are available to use for
+     *            replicating data on the failed bookie. This could contain a
+     *            single bookie server if the user explicitly chose a bookie
+     *            server to replicate data to.
+     */
+    private void recoverLedger(final InetSocketAddress bookieSrc, final String ledgerNode,
+            final MultiCallback ledgerMcb, final List<InetSocketAddress> availableBookies) {
+        /*
+         * The available node is also stored in this path so ignore that. That
+         * node is the path for the set of available Bookie Servers.
+         */
+        if (ledgerNode.equals(AVAILABLE_NODE)) {
+            ledgerMcb.processResult(BKException.Code.OK, null, null);
+            return;
+        }
+        // Parse out the ledgerId from the ZK ledger node.
+        String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
+        if (parts.length < 2) {
+            LOG.error("Ledger Node retrieved from ZK has invalid name format: " + ledgerNode);
+            ledgerMcb.processResult(BKException.Code.ZKException, null, null);
+            return;
+        }
+        final long lId;
+        try {
+            lId = Long.parseLong(parts[parts.length - 1]);
+        } catch (NumberFormatException e) {
+            LOG.error("Error retrieving ledgerId from ledgerNode: " + ledgerNode, e);
+            ledgerMcb.processResult(BKException.Code.ZKException, null, null);
+            return;
+        }
+        /*
+         * For the current ledger, open it to retrieve the LedgerHandle. This
+         * will contain the LedgerMetadata indicating which bookie servers the
+         * ledger fragments are stored on. Check if any of the ledger fragments
+         * for the current ledger are stored on the input dead bookie.
+         */
+        DigestType digestType = getLedgerDigestType(lId);
+        byte[] passwd = getLedgerPasswd(lId);
+        bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() {
+            @Override
+            public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
+                    ledgerMcb.processResult(rc, null, null);
+                    return;
+                }
+                /*
+                 * This List stores the ledger fragments to recover indexed by
+                 * the start entry ID for the range. The ensembles TreeMap is
+                 * keyed off this.
+                 */
+                final List<Long> ledgerFragmentsToRecover = new LinkedList<Long>();
+                /*
+                 * This Map will store the start and end entry ID values for
+                 * each of the ledger fragment ranges. The only exception is the
+                 * current active fragment since it has no end yet. In the event
+                 * of a bookie failure, a new ensemble is created so the current
+                 * ensemble should not contain the dead bookie we are trying to
+                 * recover.
+                 */
+                Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
+                Long curEntryId = null;
+                for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
+                        .entrySet()) {
+                    if (curEntryId != null)
+                        ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
+                    curEntryId = entry.getKey();
+                    if (entry.getValue().contains(bookieSrc)) {
+                        /*
+                         * Current ledger fragment has entries stored on the
+                         * dead bookie so we'll need to recover them.
+                         */
+                        ledgerFragmentsToRecover.add(entry.getKey());
+                    }
+                }
+                /*
+                 * See if this current ledger contains any ledger fragment that
+                 * needs to be re-replicated. If not, then just invoke the
+                 * multiCallback and return.
+                 */
+                if (ledgerFragmentsToRecover.size() == 0) {
+                    ledgerMcb.processResult(BKException.Code.OK, null, null);
+                    return;
+                }
+                /*
+                 * We have ledger fragments that need to be re-replicated to a
+                 * new bookie. Choose one randomly from the available set of
+                 * bookies.
+                 */
+                final InetSocketAddress newBookie = availableBookies.get(rand.nextInt(availableBookies.size()));
+
+                /*
+                 * Wrapper class around the ledger MultiCallback. Once all
+                 * ledger fragments for the ledger have been replicated to a new
+                 * bookie, we need to update ZK with this new metadata to point
+                 * to the new bookie instead of the old dead one. That should be
+                 * done at the end prior to invoking the ledger MultiCallback.
+                 */
+                class LedgerMultiCallbackWrapper implements AsyncCallback.VoidCallback {
+                    final MultiCallback ledgerMcb;
+
+                    LedgerMultiCallbackWrapper(MultiCallback ledgerMcb) {
+                        this.ledgerMcb = ledgerMcb;
+                    }
+
+                    @Override
+                    public void processResult(int rc, String path, Object ctx) {
+                        if (rc != Code.OK.intValue()) {
+                            LOG.error("BK error replicating ledger fragments for ledger: " + lId, BKException
+                                    .create(rc));
+                            ledgerMcb.processResult(rc, null, null);
+                            return;
+                        }
+                        /*
+                         * Update the ledger metadata's ensemble info to point
+                         * to the new bookie.
+                         */
+                        for (final Long startEntryId : ledgerFragmentsToRecover) {
+                            ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
+                                    startEntryId);
+                            int deadBookieIndex = ensemble.indexOf(bookieSrc);
+                            ensemble.remove(deadBookieIndex);
+                            ensemble.add(deadBookieIndex, newBookie);
+                        }
+                        lh.writeLedgerConfig(new AsyncCallback.StatCallback() {
+                            @Override
+                            public void processResult(int rc, String path, Object ctx, Stat stat) {
+                                if (rc != Code.OK.intValue()) {
+                                    LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
+                                            KeeperException.create(KeeperException.Code.get(rc), path));
+                                } else {
+                                    LOG.info("Updated ZK for ledgerId: (" + lh.getId()
+                                            + ") to point ledger fragments from old dead bookie: (" + bookieSrc
+                                            + ") to new bookie: (" + newBookie + ")");
+                                }
+                                /*
+                                 * Pass the return code result up the chain with
+                                 * the parent callback.
+                                 */
+                                ledgerMcb.processResult(rc, null, null);
+                            }
+                        }, null);
+                    }
+                }
+
+                /*
+                 * Now recover all of the necessary ledger fragments
+                 * asynchronously using a MultiCallback for every fragment.
+                 */
+                MultiCallback ledgerFragmentMcb = new MultiCallback(ledgerFragmentsToRecover.size(),
+                        new LedgerMultiCallbackWrapper(ledgerMcb), null);
+                for (final Long startEntryId : ledgerFragmentsToRecover) {
+                    Long endEntryId = ledgerFragmentsRange.get(startEntryId);
+                    try {
+                        recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, ledgerFragmentMcb, newBookie);
+                    } catch(InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        return;
+                    }
+                }
+            }
+        }, null);
+    }
+
+    /**
+     * This method asynchronously recovers a ledger fragment which is a
+     * contiguous portion of a ledger that was stored in an ensemble that
+     * included the failed bookie.
+     * 
+     * @param bookieSrc
+     *            Source bookie that had a failure. We want to replicate the
+     *            ledger fragments that were stored there.
+     * @param lh
+     *            LedgerHandle for the ledger
+     * @param startEntryId
+     *            Start entry Id for the ledger fragment
+     * @param endEntryId
+     *            End entry Id for the ledger fragment
+     * @param ledgerFragmentMcb
+     *            MultiCallback to invoke once we've recovered the current
+     *            ledger fragment.
+     * @param newBookie
+     *            New bookie we want to use to recover and replicate the ledger
+     *            entries that were stored on the failed bookie.
+     */
+    private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final LedgerHandle lh,
+            final Long startEntryId, final Long endEntryId, final MultiCallback ledgerFragmentMcb,
+            final InetSocketAddress newBookie) throws InterruptedException {
+        if (endEntryId == null) {
+            /*
+             * Ideally this should never happen if bookie failure is taken care
+             * of properly. Nothing we can do though in this case.
+             */
+            LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: "
+                    + lh.getId());
+            ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+            return;
+        }
+
+        ArrayList<InetSocketAddress> curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+        int bookieIndex = 0;
+        for (int i = 0; i < curEnsemble.size(); i++) {
+            if (curEnsemble.get(i).equals(bookieSrc)) {
+                bookieIndex = i;
+                break;
+            }
+        }
+        /*
+         * Loop through all entries in the current ledger fragment range and
+         * find the ones that were stored on the dead bookie.
+         */
+        List<Long> entriesToReplicate = new LinkedList<Long>();
+        for (long i = startEntryId; i <= endEntryId; i++) {
+            if (lh.getDistributionSchedule().getReplicaIndex(i, bookieIndex) >= 0) {
+                /*
+                 * Current entry is stored on the dead bookie so we'll need to
+                 * read it and replicate it to a new bookie.
+                 */
+                entriesToReplicate.add(i);
+            }
+        }
+        /*
+         * Now asynchronously replicate all of the entries for the ledger
+         * fragment that were on the dead bookie.
+         */
+        MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null);
+        for (final Long entryId : entriesToReplicate) {
+            recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
+        }
+    }
+
+    /**
+     * This method asynchronously recovers a specific ledger entry by reading
+     * the values via the BookKeeper Client (which would read it from the other
+     * replicas) and then writing it to the chosen new bookie.
+     * 
+     * @param entryId
+     *            Ledger Entry ID to recover.
+     * @param lh
+     *            LedgerHandle for the ledger
+     * @param ledgerFragmentEntryMcb
+     *            MultiCallback to invoke once we've recovered the current
+     *            ledger entry.
+     * @param newBookie
+     *            New bookie we want to use to recover and replicate the ledger
+     *            entries that were stored on the failed bookie.
+     */
+    private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh,
+            final MultiCallback ledgerFragmentEntryMcb, final InetSocketAddress newBookie) throws InterruptedException {
+        /*
+         * Read the ledger entry using the LedgerHandle. This will allow us to
+         * read the entry from one of the other replicated bookies other than
+         * the dead one.
+         */
+        lh.asyncReadEntries(entryId, entryId, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+                if (rc != Code.OK.intValue()) {
+                    LOG.error("BK error reading ledger entry: " + entryId, BKException.create(rc));
+                    ledgerFragmentEntryMcb.processResult(rc, null, null);
+                    return;
+                }
+                /*
+                 * Now that we've read the ledger entry, write it to the new
+                 * bookie we've selected.
+                 */
+                ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId,
+                        lh.getLastAddConfirmed(), seq.nextElement().getEntry());
+                bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend,
+                        new WriteCallback() {
+                            @Override
+                            public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr,
+                                    Object ctx) {
+                                if (rc != Code.OK.intValue()) {
+                                    LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: "
+                                            + entryId + ", bookie: " + addr, BKException.create(rc));
+                                } else {
+                                    LOG.debug("Success writing ledger entry to a new bookie!");
+                                }
+                                /*
+                                 * Pass the return code result up the chain with
+                                 * the parent callback.
+                                 */
+                                ledgerFragmentEntryMcb.processResult(rc, null, null);
+                            }
+                        }, null);
+            }
+        }, null);
+    }
+
+    /**
+     * Main method so we can invoke the bookie recovery via command line.
+     * 
+     * @param args
+     *            Arguments to BookKeeperTools. 2 are required and the third is
+     *            optional. The first is a comma separated list of ZK server
+     *            host:port pairs. The second is the host:port socket address
+     *            for the bookie we are trying to recover. The third is the
+     *            host:port socket address of the optional destination bookie
+     *            server we want to replicate the data over to.
+     * @throws InterruptedException
+     * @throws IOException
+     * @throws KeeperException
+     */
+    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
+        // Validate the inputs
+        if (args.length < 2) {
+            System.err.println("USAGE: BookKeeperTools zkServers bookieSrc [bookieDest]");
+            return;
+        }
+        // Parse out the input arguments
+        String zkServers = args[0];
+        String bookieSrcString[] = args[1].split(COLON);
+        if (bookieSrcString.length < 2) {
+            System.err.println("BookieSrc inputted has invalid name format (host:port expected): " + bookieSrcString);
+            return;
+        }
+        final InetSocketAddress bookieSrc = new InetSocketAddress(bookieSrcString[0], Integer
+                .parseInt(bookieSrcString[1]));
+        InetSocketAddress bookieDest = null;
+        if (args.length < 3) {
+            String bookieDestString[] = args[2].split(COLON);
+            if (bookieDestString.length < 2) {
+                System.err.println("BookieDest inputted has invalid name format (host:port expected): "
+                        + bookieDestString);
+                return;
+            }
+            bookieDest = new InetSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1]));
+        }
+
+        // Create the BookKeeperTools instance and perform the bookie recovery
+        // synchronously.
+        BookKeeperTools bkTools = new BookKeeperTools(zkServers);
+        bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+        // Shutdown the resources used in the BookKeeperTools instance.
+        bkTools.shutdown();
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java?rev=962697&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java Fri Jul  9 21:17:57 2010
@@ -0,0 +1,396 @@
+package org.apache.bookkeeper.test;
+
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.tools.BookKeeperTools;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the bookie recovery admin functionality.
+ */
+public class BookieRecoveryTest extends BaseTestCase {
+    static Logger LOG = Logger.getLogger(BookieRecoveryTest.class);
+
+    // Object used for synchronizing async method calls
+    class SyncObject {
+        boolean value;
+
+        public SyncObject() {
+            value = false;
+        }
+    }
+
+    // Object used for implementing the Bookie RecoverCallback for this jUnit
+    // test. This verifies that the operation completed successfully.
+    class BookieRecoverCallback implements RecoverCallback {
+        @Override
+        public void recoverComplete(int rc, Object ctx) {
+            LOG.info("Recovered bookie operation completed with rc: " + rc);
+            assertTrue(rc == Code.OK.intValue());
+            SyncObject sync = (SyncObject) ctx;
+            synchronized (sync) {
+                sync.value = true;
+                sync.notify();
+            }
+        }
+    }
+
+    // Objects to use for this jUnit test.
+    DigestType digestType;
+    SyncObject sync;
+    BookieRecoverCallback bookieRecoverCb;
+    BookKeeperTools bkTools;
+
+    // Constructor
+    public BookieRecoveryTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        // Set up the configuration properties needed.
+        System.setProperty("digestType", digestType.toString());
+        System.setProperty("passwd", "");
+        sync = new SyncObject();
+        bookieRecoverCb = new BookieRecoverCallback();
+        bkTools = new BookKeeperTools(HOSTPORT);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        // Release any resources used by the BookKeeperTools instance.
+        bkTools.shutdown();
+        super.tearDown();
+    }
+
+    /**
+     * Helper method to create a number of ledgers
+     * 
+     * @param numLedgers
+     *            Number of ledgers to create
+     * @return List of LedgerHandles for each of the ledgers created
+     * @throws BKException
+     * @throws KeeperException
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private List<LedgerHandle> createLedgers(int numLedgers) throws BKException, KeeperException, IOException,
+            InterruptedException {
+        List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+        for (int i = 0; i < numLedgers; i++) {
+            lhs.add(bkc.createLedger(digestType, System.getProperty("passwd").getBytes()));
+        }
+        return lhs;
+    }
+
+    /**
+     * Helper method to write dummy ledger entries to all of the ledgers passed.
+     * 
+     * @param numEntries
+     *            Number of ledger entries to write for each ledger
+     * @param startEntryId
+     *            The first entry Id we're expecting to write for each ledger
+     * @param lhs
+     *            List of LedgerHandles for all ledgers to write entries to
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
+            InterruptedException {
+        for (LedgerHandle lh : lhs) {
+            for (int i = 0; i < numEntries; i++) {
+                lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
+            }
+        }
+    }
+
+    /**
+     * Helper method to startup a new bookie server with the indicated port
+     * number
+     * 
+     * @param port
+     *            Port to start the new bookie server on
+     * @throws IOException
+     */
+    private void startNewBookie(int port) throws IOException {
+        File f = File.createTempFile("bookie", "test");
+        tmpDirs.add(f);
+        f.delete();
+        f.mkdir();
+        BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f });
+        server.start();
+        bs.add(server);
+        LOG.info("New bookie on port " + port + " has been created.");
+    }
+
+    /**
+     * Helper method to verify that we can read the recovered ledger entries.
+     * 
+     * @param numLedgers
+     *            Number of ledgers to verify
+     * @param startEntryId
+     *            Start Entry Id to read
+     * @param endEntryId
+     *            End Entry Id to read
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException,
+            InterruptedException {
+        // Get a set of LedgerHandles for all of the ledgers to verify
+        List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+        for (int i = 0; i < numLedgers; i++) {
+            lhs.add(bkc.openLedger(i + 1, digestType, System.getProperty("passwd").getBytes()));
+        }
+        // Read the ledger entries to verify that they are all present and
+        // correct in the new bookie.
+        for (LedgerHandle lh : lhs) {
+            Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+            while (entries.hasMoreElements()) {
+                LedgerEntry entry = entries.nextElement();
+                assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
+                        + entry.getEntryId()));
+            }
+        }
+
+    }
+
+    /**
+     * This tests the asynchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a new one to
+     * replace it, and then recovering the ledger entries from the killed bookie
+     * onto the new one. We'll verify that the entries stored on the killed
+     * bookie are properly copied over and restored onto the new one.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testAsyncBookieRecoveryToSpecificBookie() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup a new bookie server
+        int newBookiePort = initialPort + numBookies;
+        startNewBookie(newBookiePort);
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the async recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+                + bookieDest + ")");
+        // Initiate the sync object
+        sync.value = false;
+        bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+        // Wait for the async method to complete.
+        synchronized (sync) {
+            while (sync.value == false) {
+                sync.wait();
+            }
+        }
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the asynchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a few new
+     * bookies, and then recovering the ledger entries from the killed bookie
+     * onto random available bookie servers. We'll verify that the entries
+     * stored on the killed bookie are properly copied over and restored onto
+     * the other bookies.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testAsyncBookieRecoveryToRandomBookies() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup three new bookie servers
+        for (int i = 0; i < 3; i++) {
+            int newBookiePort = initialPort + numBookies + i;
+            startNewBookie(newBookiePort);
+        }
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the async recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                + ") and replicate it to a random available one");
+        // Initiate the sync object
+        sync.value = false;
+        bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+        // Wait for the async method to complete.
+        synchronized (sync) {
+            while (sync.value == false) {
+                sync.wait();
+            }
+        }
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the synchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a new one to
+     * replace it, and then recovering the ledger entries from the killed bookie
+     * onto the new one. We'll verify that the entries stored on the killed
+     * bookie are properly copied over and restored onto the new one.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testSyncBookieRecoveryToSpecificBookie() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup a new bookie server
+        int newBookiePort = initialPort + numBookies;
+        startNewBookie(newBookiePort);
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the sync recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+                + bookieDest + ")");
+        bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the synchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a few new
+     * bookies, and then recovering the ledger entries from the killed bookie
+     * onto random available bookie servers. We'll verify that the entries
+     * stored on the killed bookie are properly copied over and restored onto
+     * the other bookies.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testSyncBookieRecoveryToRandomBookies() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup three new bookie servers
+        for (int i = 0; i < 3; i++) {
+            int newBookiePort = initialPort + numBookies + i;
+            startNewBookie(newBookiePort);
+        }
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the sync recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                + ") and replicate it to a random available one");
+        bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+}