You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/11/15 19:42:58 UTC

svn commit: r1202370 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src: main/java/org/apache/bookkeeper/client/ main/java/org/apache/bookkeeper/proto/ test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/

Author: fpj
Date: Tue Nov 15 18:42:58 2011
New Revision: 1202370

URL: http://svn.apache.org/viewvc?rev=1202370&view=rev
Log:
BOOKKEEPER-106: recoveryBookieData can select a recovery bookie which is already in the ledgers ensemble


Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
Removed:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java
Modified:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Tue Nov 15 18:42:58 2011
@@ -207,9 +207,11 @@ public class BookKeeperAdmin {
     // Object used for calling async methods and waiting for them to complete.
     class SyncObject {
         boolean value;
+        int rc;
 
         public SyncObject() {
             value = false;
+            rc = BKException.Code.OK;
         }
     }
 
@@ -232,7 +234,7 @@ public class BookKeeperAdmin {
      *            of the ledger fragments from the source bookie over to it.
      */
     public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest)
-            throws InterruptedException {
+            throws InterruptedException, BKException {
         SyncObject sync = new SyncObject();
         // Call the async method to recover bookie data.
         asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
@@ -241,6 +243,7 @@ public class BookKeeperAdmin {
                 LOG.info("Recover bookie operation completed with rc: " + rc);
                 SyncObject syncObj = (SyncObject) ctx;
                 synchronized (syncObj) {
+                    syncObj.rc = rc;
                     syncObj.value = true;
                     syncObj.notify();
                 }
@@ -253,6 +256,9 @@ public class BookKeeperAdmin {
                 sync.wait();
             }
         }
+        if (sync.rc != BKException.Code.OK) {
+            throw BKException.create(sync.rc);
+        }
     }
 
     /**
@@ -404,6 +410,22 @@ public class BookKeeperAdmin {
     }
 
     /**
+     * Get a new random bookie, but ensure that it isn't one that is already
+     * in the ensemble for the ledger.
+     */
+    private InetSocketAddress getNewBookie(final List<InetSocketAddress> bookiesAlreadyInEnsemble, 
+                                           final List<InetSocketAddress> availableBookies) 
+            throws BKException.BKNotEnoughBookiesException {
+        ArrayList<InetSocketAddress> candidates = new ArrayList<InetSocketAddress>();
+        candidates.addAll(availableBookies);
+        candidates.removeAll(bookiesAlreadyInEnsemble);
+        if (candidates.size() == 0) {
+            throw new BKException.BKNotEnoughBookiesException();
+        }
+        return candidates.get(rand.nextInt(candidates.size()));
+    }
+
+    /**
      * This method asynchronously recovers a given ledger if any of the ledger
      * entries were stored on the failed bookie.
      *
@@ -501,79 +523,38 @@ public class BookKeeperAdmin {
                     ledgerMcb.processResult(BKException.Code.OK, null, null);
                     return;
                 }
-                /*
-                 * We have ledger fragments that need to be re-replicated to a
-                 * new bookie. Choose one randomly from the available set of
-                 * bookies.
-                 */
-                final InetSocketAddress newBookie = availableBookies.get(rand.nextInt(availableBookies.size()));
 
                 /*
-                 * Wrapper class around the ledger MultiCallback. Once all
-                 * ledger fragments for the ledger have been replicated to a new
-                 * bookie, we need to update ZK with this new metadata to point
-                 * to the new bookie instead of the old dead one. That should be
-                 * done at the end prior to invoking the ledger MultiCallback.
+                 * Multicallback for ledger. Once all fragments for the ledger have been recovered
+                 * trigger the ledgerMcb 
                  */
-                class LedgerMultiCallbackWrapper implements AsyncCallback.VoidCallback {
-                    final MultiCallback ledgerMcb;
-
-                    LedgerMultiCallbackWrapper(MultiCallback ledgerMcb) {
-                        this.ledgerMcb = ledgerMcb;
-                    }
-
-                    @Override
-                    public void processResult(int rc, String path, Object ctx) {
-                        if (rc != Code.OK.intValue()) {
-                            LOG.error("BK error replicating ledger fragments for ledger: " + lId, BKException
-                                      .create(rc));
-                            ledgerMcb.processResult(rc, null, null);
-                            return;
-                        }
-                        /*
-                         * Update the ledger metadata's ensemble info to point
-                         * to the new bookie.
-                         */
-                        for (final Long startEntryId : ledgerFragmentsToRecover) {
-                            ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
-                                    startEntryId);
-                            int deadBookieIndex = ensemble.indexOf(bookieSrc);
-                            ensemble.remove(deadBookieIndex);
-                            ensemble.add(deadBookieIndex, newBookie);
-                        }
-                        
-                        lh.writeLedgerConfig(new AsyncCallback.StatCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx, Stat stat) {
-                                if (rc != Code.OK.intValue()) {
-                                    LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
-                                              KeeperException.create(KeeperException.Code.get(rc), path));
-                                } else {
-                                    lh.getLedgerMetadata().updateZnodeStatus(stat);
-                                    LOG.info("Updated ZK for ledgerId: (" + lh.getId()
-                                             + ") to point ledger fragments from old dead bookie: (" + bookieSrc
-                                             + ") to new bookie: (" + newBookie + ")");
-                                }
-                                /*
-                                 * Pass the return code result up the chain with
-                                 * the parent callback.
-                                 */
-                                ledgerMcb.processResult(rc, null, null);
-                            }
-                        }, null);
-                    }
-                }
+                MultiCallback ledgerFragmentsMcb = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerMcb, null);
 
                 /*
                  * Now recover all of the necessary ledger fragments
                  * asynchronously using a MultiCallback for every fragment.
                  */
-                MultiCallback ledgerFragmentMcb = new MultiCallback(ledgerFragmentsToRecover.size(),
-                        new LedgerMultiCallbackWrapper(ledgerMcb), null);
                 for (final Long startEntryId : ledgerFragmentsToRecover) {
                     Long endEntryId = ledgerFragmentsRange.get(startEntryId);
+                    InetSocketAddress newBookie = null;
+                    try {
+                        newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
+                                                 availableBookies);
+                    } catch (BKException.BKNotEnoughBookiesException bke) {
+                        ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
+                        continue;
+                    }
+                    
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Replicating fragment from [" + startEntryId 
+                                  + "," + endEntryId + "] of ledger " + lh.getId()
+                                  + " to " + newBookie);
+                    }
+
                     try {
-                        recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, ledgerFragmentMcb, newBookie);
+                        SingleFragmentCallback cb = new SingleFragmentCallback(ledgerFragmentsMcb, lh, startEntryId, 
+                                                                               bookieSrc, newBookie);
+                        recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, cb, newBookie);
                     } catch(InterruptedException e) {
                         Thread.currentThread().interrupt();
                         return;
@@ -605,7 +586,7 @@ public class BookKeeperAdmin {
      *            entries that were stored on the failed bookie.
      */
     private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final LedgerHandle lh,
-                                       final Long startEntryId, final Long endEntryId, final MultiCallback ledgerFragmentMcb,
+                                       final Long startEntryId, final Long endEntryId, final SingleFragmentCallback cb,
                                        final InetSocketAddress newBookie) throws InterruptedException {
         if (endEntryId == null) {
             /*
@@ -614,7 +595,7 @@ public class BookKeeperAdmin {
              */
             LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: "
                      + lh.getId());
-            ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+            cb.processResult(BKException.Code.OK, null, null);
             return;
         }
 
@@ -644,7 +625,7 @@ public class BookKeeperAdmin {
          * Now asynchronously replicate all of the entries for the ledger
          * fragment that were on the dead bookie.
          */
-        MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null);
+        MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), cb, null);
         for (final Long entryId : entriesToReplicate) {
             recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
         }
@@ -699,7 +680,10 @@ public class BookKeeperAdmin {
                             LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: "
                                       + entryId + ", bookie: " + addr, BKException.create(rc));
                         } else {
-                            LOG.debug("Success writing ledger entry to a new bookie!");
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Success writing ledger id " +ledgerId + ", entry id "
+                                          + entryId + " to a new bookie " + addr + "!");
+                            }
                         }
                         /*
                          * Pass the return code result up the chain with
@@ -711,4 +695,79 @@ public class BookKeeperAdmin {
             }
         }, null);
     }
+
+    /*
+     * Callback for recovery of a single ledger fragment.
+     * Once the fragment has had all entries replicated, update the ensemble 
+     * in zookeeper.
+     * Once finished propogate callback up to ledgerFragmentsMcb which should
+     * be a multicallback responsible for all fragments in a single ledger
+     */
+    class SingleFragmentCallback implements AsyncCallback.VoidCallback {
+        final MultiCallback ledgerFragmentsMcb;
+        final LedgerHandle lh;
+        final long fragmentStartId;
+        final InetSocketAddress oldBookie;
+        final InetSocketAddress newBookie;
+
+        SingleFragmentCallback(MultiCallback ledgerFragmentsMcb, LedgerHandle lh, 
+                               long fragmentStartId,
+                               InetSocketAddress oldBookie,
+                               InetSocketAddress newBookie) {
+            this.ledgerFragmentsMcb = ledgerFragmentsMcb;
+            this.lh = lh;
+            this.fragmentStartId = fragmentStartId;
+            this.newBookie = newBookie;
+            this.oldBookie = oldBookie;
+        }
+        
+        @Override
+        public void processResult(int rc, String path, Object ctx) {
+            if (rc != Code.OK.intValue()) {
+                LOG.error("BK error replicating ledger fragments for ledger: " + lh.getId(), 
+                          BKException.create(rc));
+                ledgerFragmentsMcb.processResult(rc, null, null);
+                return;
+            }
+            /*
+             * Update the ledger metadata's ensemble info to point
+             * to the new bookie.
+             */
+            ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
+                    fragmentStartId);
+            int deadBookieIndex = ensemble.indexOf(oldBookie);
+            ensemble.remove(deadBookieIndex);
+            ensemble.add(deadBookieIndex, newBookie);
+            
+            
+            lh.writeLedgerConfig(new WriteCb(), null);
+        }
+        
+        private class WriteCb implements AsyncCallback.StatCallback {
+            @Override
+            public void processResult(int rc, String path, Object ctx, Stat stat) {
+                if (rc == Code.BADVERSION.intValue()) {
+                    LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId() 
+                             + " startid: " + fragmentStartId);
+                    // try again, the previous success (with which this has conflicted)
+                    // will have updated the stat
+                    lh.writeLedgerConfig(new WriteCb(), null);
+                    return;
+                } else if (rc != Code.OK.intValue()) {
+                    LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
+                              KeeperException.create(KeeperException.Code.get(rc), path));
+                } else {
+                    lh.getLedgerMetadata().updateZnodeStatus(stat);
+                    LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : " + fragmentStartId 
+                             + ") to point ledger fragments from old dead bookie: (" + oldBookie
+                             + ") to new bookie: (" + newBookie + ")");
+                }
+                /*
+                 * Pass the return code result up the chain with
+                 * the parent callback.
+                 */
+                ledgerFragmentsMcb.processResult(rc, null, null);
+            }
+        };
+    }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Tue Nov 15 18:42:58 2011
@@ -23,7 +23,9 @@ package org.apache.bookkeeper.proto;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -56,6 +58,14 @@ public class BookieServer implements NIO
         deathWatcher.start();
     }
 
+    public InetSocketAddress getLocalAddress() {
+        try {
+            return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+        } catch (UnknownHostException uhe) {
+            return nioServerFactory.getLocalAddress();
+        }
+    }
+
     public synchronized void shutdown() throws InterruptedException {
         if (!running) {
             return;

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1202370&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Tue Nov 15 18:42:58 2011
@@ -0,0 +1,607 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Random;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the bookie recovery admin functionality.
+ */
+public class BookieRecoveryTest extends BaseTestCase {
+    static Logger LOG = Logger.getLogger(BookieRecoveryTest.class);
+
+    // Object used for synchronizing async method calls
+    class SyncObject {
+        boolean value;
+
+        public SyncObject() {
+            value = false;
+        }
+    }
+
+    // Object used for implementing the Bookie RecoverCallback for this jUnit
+    // test. This verifies that the operation completed successfully.
+    class BookieRecoverCallback implements RecoverCallback {
+        @Override
+        public void recoverComplete(int rc, Object ctx) {
+            LOG.info("Recovered bookie operation completed with rc: " + rc);
+            assertTrue(rc == Code.OK.intValue());
+            SyncObject sync = (SyncObject) ctx;
+            synchronized (sync) {
+                sync.value = true;
+                sync.notify();
+            }
+        }
+    }
+
+    // Objects to use for this jUnit test.
+    DigestType digestType;
+    SyncObject sync;
+    BookieRecoverCallback bookieRecoverCb;
+    BookKeeperAdmin bkAdmin;
+
+    // Constructor
+    public BookieRecoveryTest(DigestType digestType) {
+        super(3);
+        this.digestType = digestType;
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        // Set up the configuration properties needed.
+        System.setProperty("digestType", digestType.toString());
+        System.setProperty("passwd", "");
+        sync = new SyncObject();
+        bookieRecoverCb = new BookieRecoverCallback();
+        bkAdmin = new BookKeeperAdmin(HOSTPORT);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        // Release any resources used by the BookKeeperTools instance.
+        bkAdmin.close();
+        super.tearDown();
+    }
+
+    /**
+     * Helper method to create a number of ledgers
+     *
+     * @param numLedgers
+     *            Number of ledgers to create
+     * @return List of LedgerHandles for each of the ledgers created
+     */
+    private List<LedgerHandle> createLedgers(int numLedgers) 
+            throws BKException, KeeperException, IOException, InterruptedException 
+    {
+        return createLedgers(numLedgers, 3, 2);
+    }
+
+    /**
+     * Helper method to create a number of ledgers
+     *
+     * @param numLedgers
+     *            Number of ledgers to create
+     * @param ensemble Ensemble size for ledgers
+     * @param quorum Quorum size for ledgers
+     * @return List of LedgerHandles for each of the ledgers created
+     */
+    private List<LedgerHandle> createLedgers(int numLedgers, int ensemble, int quorum) 
+            throws BKException, KeeperException, IOException,
+        InterruptedException {
+        List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+        for (int i = 0; i < numLedgers; i++) {
+            lhs.add(bkc.createLedger(ensemble, quorum, digestType, System.getProperty("passwd").getBytes()));
+        }
+        return lhs;
+    }
+
+    /**
+     * Helper method to write dummy ledger entries to all of the ledgers passed.
+     *
+     * @param numEntries
+     *            Number of ledger entries to write for each ledger
+     * @param startEntryId
+     *            The first entry Id we're expecting to write for each ledger
+     * @param lhs
+     *            List of LedgerHandles for all ledgers to write entries to
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
+        InterruptedException {
+        for (LedgerHandle lh : lhs) {
+            for (int i = 0; i < numEntries; i++) {
+                lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
+            }
+        }
+    }
+
+    /**
+     * Helper method to startup a new bookie server with the indicated port
+     * number
+     *
+     * @param port
+     *            Port to start the new bookie server on
+     * @throws IOException
+     */
+    private void startNewBookie(int port)
+            throws IOException, InterruptedException, KeeperException {
+        File f = File.createTempFile("bookie", "test");
+        tmpDirs.add(f);
+        f.delete();
+        f.mkdir();
+
+        BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f });
+        server.start();
+        bs.add(server);
+
+        while(bkc.getZkHandle().exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+            Thread.sleep(500);
+        }
+
+        bkc.readBookiesBlocking();
+        LOG.info("New bookie on port " + port + " has been created.");
+    }
+
+    /**
+     * Helper method to verify that we can read the recovered ledger entries.
+     *
+     * @param numLedgers
+     *            Number of ledgers to verify
+     * @param startEntryId
+     *            Start Entry Id to read
+     * @param endEntryId
+     *            End Entry Id to read
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException,
+        InterruptedException {
+        // Get a set of LedgerHandles for all of the ledgers to verify
+        List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+        for (int i = 0; i < numLedgers; i++) {
+            lhs.add(bkc.openLedger(i + 1, digestType, System.getProperty("passwd").getBytes()));
+        }
+        // Read the ledger entries to verify that they are all present and
+        // correct in the new bookie.
+        for (LedgerHandle lh : lhs) {
+            Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+            while (entries.hasMoreElements()) {
+                LedgerEntry entry = entries.nextElement();
+                assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
+                           + entry.getEntryId()));
+            }
+        }
+
+    }
+
+    /**
+     * This tests the asynchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a new one to
+     * replace it, and then recovering the ledger entries from the killed bookie
+     * onto the new one. We'll verify that the entries stored on the killed
+     * bookie are properly copied over and restored onto the new one.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAsyncBookieRecoveryToSpecificBookie() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup a new bookie server
+        int newBookiePort = initialPort + numBookies;
+        startNewBookie(newBookiePort);
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the async recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+                 + bookieDest + ")");
+        // Initiate the sync object
+        sync.value = false;
+        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+        // Wait for the async method to complete.
+        synchronized (sync) {
+            while (sync.value == false) {
+                sync.wait();
+            }
+        }
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the asynchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a few new
+     * bookies, and then recovering the ledger entries from the killed bookie
+     * onto random available bookie servers. We'll verify that the entries
+     * stored on the killed bookie are properly copied over and restored onto
+     * the other bookies.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testAsyncBookieRecoveryToRandomBookies() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup three new bookie servers
+        for (int i = 0; i < 3; i++) {
+            int newBookiePort = initialPort + numBookies + i;
+            startNewBookie(newBookiePort);
+        }
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the async recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                 + ") and replicate it to a random available one");
+        // Initiate the sync object
+        sync.value = false;
+        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+        // Wait for the async method to complete.
+        synchronized (sync) {
+            while (sync.value == false) {
+                sync.wait();
+            }
+        }
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the synchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a new one to
+     * replace it, and then recovering the ledger entries from the killed bookie
+     * onto the new one. We'll verify that the entries stored on the killed
+     * bookie are properly copied over and restored onto the new one.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSyncBookieRecoveryToSpecificBookie() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup a new bookie server
+        int newBookiePort = initialPort + numBookies;
+        startNewBookie(newBookiePort);
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the sync recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+                 + bookieDest + ")");
+        bkAdmin.recoverBookieData(bookieSrc, bookieDest);
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    /**
+     * This tests the synchronous bookie recovery functionality by writing
+     * entries into 3 bookies, killing one bookie, starting up a few new
+     * bookies, and then recovering the ledger entries from the killed bookie
+     * onto random available bookie servers. We'll verify that the entries
+     * stored on the killed bookie are properly copied over and restored onto
+     * the other bookies.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSyncBookieRecoveryToRandomBookies() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Startup three new bookie servers
+        for (int i = 0; i < 3; i++) {
+            int newBookiePort = initialPort + numBookies + i;
+            startNewBookie(newBookiePort);
+        }
+
+        // Write some more entries for the ledgers so a new ensemble will be
+        // created for them.
+        writeEntriestoLedgers(numMsgs, 10, lhs);
+
+        // Call the sync recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                 + ") and replicate it to a random available one");
+        bkAdmin.recoverBookieData(bookieSrc, bookieDest);
+
+        // Verify the recovered ledger entries are okay.
+        verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+    }
+
+    private static class ReplicationVerificationCallback implements ReadEntryCallback {
+        final CountDownLatch latch;
+        final AtomicLong numSuccess;
+
+        ReplicationVerificationCallback(int numRequests) {
+            latch = new CountDownLatch(numRequests);
+            numSuccess = new AtomicLong(0);
+        }
+
+        public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
+            if (LOG.isDebugEnabled()) {
+                InetSocketAddress addr = (InetSocketAddress)ctx;
+                LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " + entryId + " from " + ctx);
+            }
+            if (rc == BKException.Code.OK) {
+                numSuccess.incrementAndGet();
+            }
+            latch.countDown();
+        }
+
+        long await() throws InterruptedException {
+            if (latch.await(60, TimeUnit.SECONDS) == false) {
+                LOG.warn("Didn't get all responses in verification");
+                return 0;
+            } else {
+                return numSuccess.get();
+            }
+        }
+    }
+
+    private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception {
+        String znodepath = StringUtils.getLedgerNodePath(lh.getId());
+        Stat stat = bkc.getZkHandle().exists(znodepath, false);
+        assertNotNull(stat);
+        byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat); 
+        LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion()); 
+
+        Map<Long, ArrayList<InetSocketAddress>> ensembles = md.getEnsembles();
+
+        HashMap<Long, Long> ranges = new HashMap<Long, Long>();
+        ArrayList<Long> keyList = Collections.list(
+                Collections.enumeration(ensembles.keySet()));
+        Collections.sort(keyList);
+        for (int i = 0; i < keyList.size() - 1; i++) {
+            ranges.put(keyList.get(i), keyList.get(i+1));
+        }
+        ranges.put(keyList.get(keyList.size()-1), untilEntry);
+        
+        for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
+            int quorum = md.quorumSize;
+            long startEntryId = e.getKey();
+            long endEntryId = ranges.get(startEntryId);
+            long expectedSuccess = quorum*(endEntryId-startEntryId);
+            int numRequests = e.getValue().size()*((int)(endEntryId-startEntryId));
+
+            ReplicationVerificationCallback cb = new ReplicationVerificationCallback(numRequests);
+            for (long i = startEntryId; i < endEntryId; i++) {
+                for (InetSocketAddress addr : e.getValue()) {
+                    bkc.bookieClient.readEntry(addr, lh.getId(), i, cb, addr);
+                }
+            }
+
+            long numSuccess = cb.await();
+            if (numSuccess < expectedSuccess) {
+                LOG.warn("Fragment not fully replicated ledgerId = " + lh.getId()
+                         + " startEntryId = " + startEntryId
+                         + " endEntryId = " + endEntryId 
+                         + " expectedSuccess = " + expectedSuccess
+                         + " gotSuccess = " + numSuccess);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean findDupesInEnsembles(List<LedgerHandle> lhs) throws Exception {
+        long numDupes = 0;
+        for (LedgerHandle lh : lhs) {
+            String znodepath = StringUtils.getLedgerNodePath(lh.getId());
+            Stat stat = bkc.getZkHandle().exists(znodepath, false);
+            assertNotNull(stat);
+            byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat); 
+            LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion()); 
+            
+            for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : md.getEnsembles().entrySet()) {
+                HashSet<InetSocketAddress> set = new HashSet<InetSocketAddress>();
+                long fragment = e.getKey();
+                
+                for (InetSocketAddress addr : e.getValue()) {
+                    if (set.contains(addr)) {
+                        LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment
+                                + " of ledger " + lh.getId());
+                        numDupes++;
+                    }
+                    set.add(addr);
+                }
+            }
+        }
+        return numDupes > 0;
+    }
+
+    @Test
+    public void testAsyncBookieRecoveryToRandomBookiesNotEnoughBookies() throws Exception {
+        // Create the ledgers
+        int numLedgers = 3;
+        List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+        // Write the entries for the ledgers with dummy values.
+        int numMsgs = 10;
+        writeEntriestoLedgers(numMsgs, 0, lhs);
+
+        // Shutdown the first bookie server
+        LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+        bs.get(0).shutdown();
+        bs.remove(0);
+
+        // Call the async recover bookie method.
+        InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+        InetSocketAddress bookieDest = null;
+        LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                 + ") and replicate it to a random available one");
+        // Initiate the sync object
+        sync.value = false;
+        try {
+            bkAdmin.recoverBookieData(bookieSrc, null);
+            fail("Should have thrown exception");
+        } catch (BKException.BKLedgerRecoveryException bke) {
+            // correct behaviour
+        }
+    }
+
+    @Test
+    public void testSyncBookieRecoveryToRandomBookiesCheckForDupes() throws Exception {
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            // Create the ledgers
+            int numLedgers = 3;
+            List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+            
+            // Write the entries for the ledgers with dummy values.
+            int numMsgs = 100;
+            writeEntriestoLedgers(numMsgs, 0, lhs);
+            
+            // Shutdown the first bookie server
+            LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+            int removeIndex = r.nextInt(bs.size());
+            InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
+            bs.get(removeIndex).shutdown();
+            bs.remove(removeIndex);
+            
+            // Startup three new bookie servers
+            int newBookiePort = initialPort + numBookies + i;
+            startNewBookie(newBookiePort);
+            
+            // Write some more entries for the ledgers so a new ensemble will be
+            // created for them.
+            writeEntriestoLedgers(numMsgs, numMsgs, lhs);
+            
+            // Call the async recover bookie method.
+            LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+                     + ") and replicate it to a random available one");
+            // Initiate the sync object
+            sync.value = false;
+            bkAdmin.recoverBookieData(bookieSrc, null);
+            assertFalse("Dupes exist in ensembles", findDupesInEnsembles(lhs));
+
+            // Write some more entries to ensure fencing hasn't broken stuff
+            writeEntriestoLedgers(numMsgs, numMsgs*2, lhs);
+            for (LedgerHandle lh : lhs) {
+                assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs*3));
+                // TODO (BOOKKEEPER-112) this throws an exception at the moment 
+                // because recovering a ledger updates the ledger znode
+                //lh.close();
+            }
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java Tue Nov 15 18:42:58 2011
@@ -53,19 +53,19 @@ import junit.framework.TestCase;
 public abstract class BaseTestCase extends TestCase {
     static final Logger LOG = Logger.getLogger(BaseTestCase.class);
     // ZooKeeper related variables
-    static final String HOSTPORT = "127.0.0.1:2181";
-    static Integer ZooKeeperDefaultPort = 2181;
-    ZooKeeperServer zks;
-    ZooKeeper zkc; // zookeeper client
-    NIOServerCnxnFactory serverFactory;
-    File ZkTmpDir;
+    protected static final String HOSTPORT = "127.0.0.1:2181";
+    protected static Integer ZooKeeperDefaultPort = 2181;
+    protected ZooKeeperServer zks;
+    protected ZooKeeper zkc; // zookeeper client
+    protected NIOServerCnxnFactory serverFactory;
+    protected File ZkTmpDir;
 
     // BookKeeper
-    List<File> tmpDirs = new ArrayList<File>();
-    List<BookieServer> bs = new ArrayList<BookieServer>();
-    Integer initialPort = 5000;
-    int numBookies;
-    BookKeeperTestClient bkc;
+    protected List<File> tmpDirs = new ArrayList<File>();
+    protected List<BookieServer> bs = new ArrayList<BookieServer>();
+    protected Integer initialPort = 5000;
+    protected int numBookies;
+    protected BookKeeperTestClient bkc;
 
     public BaseTestCase(int numBookies) {
         this.numBookies = numBookies;