You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2011/11/15 19:42:58 UTC
svn commit: r1202370 - in /zookeeper/bookkeeper/trunk/bookkeeper-server/src:
main/java/org/apache/bookkeeper/client/
main/java/org/apache/bookkeeper/proto/
test/java/org/apache/bookkeeper/client/ test/java/org/apache/bookkeeper/test/
Author: fpj
Date: Tue Nov 15 18:42:58 2011
New Revision: 1202370
URL: http://svn.apache.org/viewvc?rev=1202370&view=rev
Log:
BOOKKEEPER-106: recoveryBookieData can select a recovery bookie which is already in the ledgers ensemble
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
Removed:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieRecoveryTest.java
Modified:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Tue Nov 15 18:42:58 2011
@@ -207,9 +207,11 @@ public class BookKeeperAdmin {
// Object used for calling async methods and waiting for them to complete.
class SyncObject {
boolean value;
+ int rc;
public SyncObject() {
value = false;
+ rc = BKException.Code.OK;
}
}
@@ -232,7 +234,7 @@ public class BookKeeperAdmin {
* of the ledger fragments from the source bookie over to it.
*/
public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest)
- throws InterruptedException {
+ throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
@@ -241,6 +243,7 @@ public class BookKeeperAdmin {
LOG.info("Recover bookie operation completed with rc: " + rc);
SyncObject syncObj = (SyncObject) ctx;
synchronized (syncObj) {
+ syncObj.rc = rc;
syncObj.value = true;
syncObj.notify();
}
@@ -253,6 +256,9 @@ public class BookKeeperAdmin {
sync.wait();
}
}
+ if (sync.rc != BKException.Code.OK) {
+ throw BKException.create(sync.rc);
+ }
}
/**
@@ -404,6 +410,22 @@ public class BookKeeperAdmin {
}
/**
+ * Get a new random bookie, but ensure that it isn't one that is already
+ * in the ensemble for the ledger.
+ */
+ private InetSocketAddress getNewBookie(final List<InetSocketAddress> bookiesAlreadyInEnsemble,
+ final List<InetSocketAddress> availableBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ ArrayList<InetSocketAddress> candidates = new ArrayList<InetSocketAddress>();
+ candidates.addAll(availableBookies);
+ candidates.removeAll(bookiesAlreadyInEnsemble);
+ if (candidates.size() == 0) {
+ throw new BKException.BKNotEnoughBookiesException();
+ }
+ return candidates.get(rand.nextInt(candidates.size()));
+ }
+
+ /**
* This method asynchronously recovers a given ledger if any of the ledger
* entries were stored on the failed bookie.
*
@@ -501,79 +523,38 @@ public class BookKeeperAdmin {
ledgerMcb.processResult(BKException.Code.OK, null, null);
return;
}
- /*
- * We have ledger fragments that need to be re-replicated to a
- * new bookie. Choose one randomly from the available set of
- * bookies.
- */
- final InetSocketAddress newBookie = availableBookies.get(rand.nextInt(availableBookies.size()));
/*
- * Wrapper class around the ledger MultiCallback. Once all
- * ledger fragments for the ledger have been replicated to a new
- * bookie, we need to update ZK with this new metadata to point
- * to the new bookie instead of the old dead one. That should be
- * done at the end prior to invoking the ledger MultiCallback.
+ * Multicallback for ledger. Once all fragments for the ledger have been recovered
+ * trigger the ledgerMcb
*/
- class LedgerMultiCallbackWrapper implements AsyncCallback.VoidCallback {
- final MultiCallback ledgerMcb;
-
- LedgerMultiCallbackWrapper(MultiCallback ledgerMcb) {
- this.ledgerMcb = ledgerMcb;
- }
-
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("BK error replicating ledger fragments for ledger: " + lId, BKException
- .create(rc));
- ledgerMcb.processResult(rc, null, null);
- return;
- }
- /*
- * Update the ledger metadata's ensemble info to point
- * to the new bookie.
- */
- for (final Long startEntryId : ledgerFragmentsToRecover) {
- ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
- startEntryId);
- int deadBookieIndex = ensemble.indexOf(bookieSrc);
- ensemble.remove(deadBookieIndex);
- ensemble.add(deadBookieIndex, newBookie);
- }
-
- lh.writeLedgerConfig(new AsyncCallback.StatCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
- KeeperException.create(KeeperException.Code.get(rc), path));
- } else {
- lh.getLedgerMetadata().updateZnodeStatus(stat);
- LOG.info("Updated ZK for ledgerId: (" + lh.getId()
- + ") to point ledger fragments from old dead bookie: (" + bookieSrc
- + ") to new bookie: (" + newBookie + ")");
- }
- /*
- * Pass the return code result up the chain with
- * the parent callback.
- */
- ledgerMcb.processResult(rc, null, null);
- }
- }, null);
- }
- }
+ MultiCallback ledgerFragmentsMcb = new MultiCallback(ledgerFragmentsToRecover.size(), ledgerMcb, null);
/*
* Now recover all of the necessary ledger fragments
* asynchronously using a MultiCallback for every fragment.
*/
- MultiCallback ledgerFragmentMcb = new MultiCallback(ledgerFragmentsToRecover.size(),
- new LedgerMultiCallbackWrapper(ledgerMcb), null);
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
+ InetSocketAddress newBookie = null;
+ try {
+ newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
+ availableBookies);
+ } catch (BKException.BKNotEnoughBookiesException bke) {
+ ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
+ continue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replicating fragment from [" + startEntryId
+ + "," + endEntryId + "] of ledger " + lh.getId()
+ + " to " + newBookie);
+ }
+
try {
- recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, ledgerFragmentMcb, newBookie);
+ SingleFragmentCallback cb = new SingleFragmentCallback(ledgerFragmentsMcb, lh, startEntryId,
+ bookieSrc, newBookie);
+ recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, cb, newBookie);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
return;
@@ -605,7 +586,7 @@ public class BookKeeperAdmin {
* entries that were stored on the failed bookie.
*/
private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final LedgerHandle lh,
- final Long startEntryId, final Long endEntryId, final MultiCallback ledgerFragmentMcb,
+ final Long startEntryId, final Long endEntryId, final SingleFragmentCallback cb,
final InetSocketAddress newBookie) throws InterruptedException {
if (endEntryId == null) {
/*
@@ -614,7 +595,7 @@ public class BookKeeperAdmin {
*/
LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: "
+ lh.getId());
- ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+ cb.processResult(BKException.Code.OK, null, null);
return;
}
@@ -644,7 +625,7 @@ public class BookKeeperAdmin {
* Now asynchronously replicate all of the entries for the ledger
* fragment that were on the dead bookie.
*/
- MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null);
+ MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), cb, null);
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
}
@@ -699,7 +680,10 @@ public class BookKeeperAdmin {
LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: "
+ entryId + ", bookie: " + addr, BKException.create(rc));
} else {
- LOG.debug("Success writing ledger entry to a new bookie!");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Success writing ledger id " +ledgerId + ", entry id "
+ + entryId + " to a new bookie " + addr + "!");
+ }
}
/*
* Pass the return code result up the chain with
@@ -711,4 +695,79 @@ public class BookKeeperAdmin {
}
}, null);
}
+
+ /*
+ * Callback for recovery of a single ledger fragment.
+ * Once the fragment has had all entries replicated, update the ensemble
+ * in zookeeper.
+ * Once finished propogate callback up to ledgerFragmentsMcb which should
+ * be a multicallback responsible for all fragments in a single ledger
+ */
+ class SingleFragmentCallback implements AsyncCallback.VoidCallback {
+ final MultiCallback ledgerFragmentsMcb;
+ final LedgerHandle lh;
+ final long fragmentStartId;
+ final InetSocketAddress oldBookie;
+ final InetSocketAddress newBookie;
+
+ SingleFragmentCallback(MultiCallback ledgerFragmentsMcb, LedgerHandle lh,
+ long fragmentStartId,
+ InetSocketAddress oldBookie,
+ InetSocketAddress newBookie) {
+ this.ledgerFragmentsMcb = ledgerFragmentsMcb;
+ this.lh = lh;
+ this.fragmentStartId = fragmentStartId;
+ this.newBookie = newBookie;
+ this.oldBookie = oldBookie;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error replicating ledger fragments for ledger: " + lh.getId(),
+ BKException.create(rc));
+ ledgerFragmentsMcb.processResult(rc, null, null);
+ return;
+ }
+ /*
+ * Update the ledger metadata's ensemble info to point
+ * to the new bookie.
+ */
+ ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
+ fragmentStartId);
+ int deadBookieIndex = ensemble.indexOf(oldBookie);
+ ensemble.remove(deadBookieIndex);
+ ensemble.add(deadBookieIndex, newBookie);
+
+
+ lh.writeLedgerConfig(new WriteCb(), null);
+ }
+
+ private class WriteCb implements AsyncCallback.StatCallback {
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc == Code.BADVERSION.intValue()) {
+ LOG.warn("Two fragments attempted update at once; ledger id: " + lh.getId()
+ + " startid: " + fragmentStartId);
+ // try again, the previous success (with which this has conflicted)
+ // will have updated the stat
+ lh.writeLedgerConfig(new WriteCb(), null);
+ return;
+ } else if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ } else {
+ lh.getLedgerMetadata().updateZnodeStatus(stat);
+ LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : " + fragmentStartId
+ + ") to point ledger fragments from old dead bookie: (" + oldBookie
+ + ") to new bookie: (" + newBookie + ")");
+ }
+ /*
+ * Pass the return code result up the chain with
+ * the parent callback.
+ */
+ ledgerFragmentsMcb.processResult(rc, null, null);
+ }
+ };
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Tue Nov 15 18:42:58 2011
@@ -23,7 +23,9 @@ package org.apache.bookkeeper.proto;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.bookie.Bookie;
@@ -56,6 +58,14 @@ public class BookieServer implements NIO
deathWatcher.start();
}
+ public InetSocketAddress getLocalAddress() {
+ try {
+ return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
+ } catch (UnknownHostException uhe) {
+ return nioServerFactory.getLocalAddress();
+ }
+ }
+
public synchronized void shutdown() throws InterruptedException {
if (!running) {
return;
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java?rev=1202370&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java Tue Nov 15 18:42:58 2011
@@ -0,0 +1,607 @@
+package org.apache.bookkeeper.client;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Random;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.util.StringUtils;
+import org.apache.bookkeeper.test.BaseTestCase;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the bookie recovery admin functionality.
+ */
+public class BookieRecoveryTest extends BaseTestCase {
+ static Logger LOG = Logger.getLogger(BookieRecoveryTest.class);
+
+ // Object used for synchronizing async method calls
+ class SyncObject {
+ boolean value;
+
+ public SyncObject() {
+ value = false;
+ }
+ }
+
+ // Object used for implementing the Bookie RecoverCallback for this jUnit
+ // test. This verifies that the operation completed successfully.
+ class BookieRecoverCallback implements RecoverCallback {
+ @Override
+ public void recoverComplete(int rc, Object ctx) {
+ LOG.info("Recovered bookie operation completed with rc: " + rc);
+ assertTrue(rc == Code.OK.intValue());
+ SyncObject sync = (SyncObject) ctx;
+ synchronized (sync) {
+ sync.value = true;
+ sync.notify();
+ }
+ }
+ }
+
+ // Objects to use for this jUnit test.
+ DigestType digestType;
+ SyncObject sync;
+ BookieRecoverCallback bookieRecoverCb;
+ BookKeeperAdmin bkAdmin;
+
+ // Constructor
+ public BookieRecoveryTest(DigestType digestType) {
+ super(3);
+ this.digestType = digestType;
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ // Set up the configuration properties needed.
+ System.setProperty("digestType", digestType.toString());
+ System.setProperty("passwd", "");
+ sync = new SyncObject();
+ bookieRecoverCb = new BookieRecoverCallback();
+ bkAdmin = new BookKeeperAdmin(HOSTPORT);
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ // Release any resources used by the BookKeeperTools instance.
+ bkAdmin.close();
+ super.tearDown();
+ }
+
+ /**
+ * Helper method to create a number of ledgers
+ *
+ * @param numLedgers
+ * Number of ledgers to create
+ * @return List of LedgerHandles for each of the ledgers created
+ */
+ private List<LedgerHandle> createLedgers(int numLedgers)
+ throws BKException, KeeperException, IOException, InterruptedException
+ {
+ return createLedgers(numLedgers, 3, 2);
+ }
+
+ /**
+ * Helper method to create a number of ledgers
+ *
+ * @param numLedgers
+ * Number of ledgers to create
+ * @param ensemble Ensemble size for ledgers
+ * @param quorum Quorum size for ledgers
+ * @return List of LedgerHandles for each of the ledgers created
+ */
+ private List<LedgerHandle> createLedgers(int numLedgers, int ensemble, int quorum)
+ throws BKException, KeeperException, IOException,
+ InterruptedException {
+ List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+ for (int i = 0; i < numLedgers; i++) {
+ lhs.add(bkc.createLedger(ensemble, quorum, digestType, System.getProperty("passwd").getBytes()));
+ }
+ return lhs;
+ }
+
+ /**
+ * Helper method to write dummy ledger entries to all of the ledgers passed.
+ *
+ * @param numEntries
+ * Number of ledger entries to write for each ledger
+ * @param startEntryId
+ * The first entry Id we're expecting to write for each ledger
+ * @param lhs
+ * List of LedgerHandles for all ledgers to write entries to
+ * @throws BKException
+ * @throws InterruptedException
+ */
+ private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
+ InterruptedException {
+ for (LedgerHandle lh : lhs) {
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
+ }
+ }
+ }
+
+ /**
+ * Helper method to startup a new bookie server with the indicated port
+ * number
+ *
+ * @param port
+ * Port to start the new bookie server on
+ * @throws IOException
+ */
+ private void startNewBookie(int port)
+ throws IOException, InterruptedException, KeeperException {
+ File f = File.createTempFile("bookie", "test");
+ tmpDirs.add(f);
+ f.delete();
+ f.mkdir();
+
+ BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f });
+ server.start();
+ bs.add(server);
+
+ while(bkc.getZkHandle().exists("/ledgers/available/" + InetAddress.getLocalHost().getHostAddress() + ":" + port, false) == null) {
+ Thread.sleep(500);
+ }
+
+ bkc.readBookiesBlocking();
+ LOG.info("New bookie on port " + port + " has been created.");
+ }
+
+ /**
+ * Helper method to verify that we can read the recovered ledger entries.
+ *
+ * @param numLedgers
+ * Number of ledgers to verify
+ * @param startEntryId
+ * Start Entry Id to read
+ * @param endEntryId
+ * End Entry Id to read
+ * @throws BKException
+ * @throws InterruptedException
+ */
+ private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException,
+ InterruptedException {
+ // Get a set of LedgerHandles for all of the ledgers to verify
+ List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+ for (int i = 0; i < numLedgers; i++) {
+ lhs.add(bkc.openLedger(i + 1, digestType, System.getProperty("passwd").getBytes()));
+ }
+ // Read the ledger entries to verify that they are all present and
+ // correct in the new bookie.
+ for (LedgerHandle lh : lhs) {
+ Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
+ + entry.getEntryId()));
+ }
+ }
+
+ }
+
+ /**
+ * This tests the asynchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a new one to
+ * replace it, and then recovering the ledger entries from the killed bookie
+ * onto the new one. We'll verify that the entries stored on the killed
+ * bookie are properly copied over and restored onto the new one.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAsyncBookieRecoveryToSpecificBookie() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup a new bookie server
+ int newBookiePort = initialPort + numBookies;
+ startNewBookie(newBookiePort);
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the async recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+ + bookieDest + ")");
+ // Initiate the sync object
+ sync.value = false;
+ bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+ // Wait for the async method to complete.
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the asynchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a few new
+ * bookies, and then recovering the ledger entries from the killed bookie
+ * onto random available bookie servers. We'll verify that the entries
+ * stored on the killed bookie are properly copied over and restored onto
+ * the other bookies.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAsyncBookieRecoveryToRandomBookies() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup three new bookie servers
+ for (int i = 0; i < 3; i++) {
+ int newBookiePort = initialPort + numBookies + i;
+ startNewBookie(newBookiePort);
+ }
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the async recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ // Initiate the sync object
+ sync.value = false;
+ bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+ // Wait for the async method to complete.
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the synchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a new one to
+ * replace it, and then recovering the ledger entries from the killed bookie
+ * onto the new one. We'll verify that the entries stored on the killed
+ * bookie are properly copied over and restored onto the new one.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSyncBookieRecoveryToSpecificBookie() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup a new bookie server
+ int newBookiePort = initialPort + numBookies;
+ startNewBookie(newBookiePort);
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the sync recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+ + bookieDest + ")");
+ bkAdmin.recoverBookieData(bookieSrc, bookieDest);
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the synchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a few new
+ * bookies, and then recovering the ledger entries from the killed bookie
+ * onto random available bookie servers. We'll verify that the entries
+ * stored on the killed bookie are properly copied over and restored onto
+ * the other bookies.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSyncBookieRecoveryToRandomBookies() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup three new bookie servers
+ for (int i = 0; i < 3; i++) {
+ int newBookiePort = initialPort + numBookies + i;
+ startNewBookie(newBookiePort);
+ }
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the sync recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ bkAdmin.recoverBookieData(bookieSrc, bookieDest);
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ private static class ReplicationVerificationCallback implements ReadEntryCallback {
+ final CountDownLatch latch;
+ final AtomicLong numSuccess;
+
+ ReplicationVerificationCallback(int numRequests) {
+ latch = new CountDownLatch(numRequests);
+ numSuccess = new AtomicLong(0);
+ }
+
+ public void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx) {
+ if (LOG.isDebugEnabled()) {
+ InetSocketAddress addr = (InetSocketAddress)ctx;
+ LOG.debug("Got " + rc + " for ledger " + ledgerId + " entry " + entryId + " from " + ctx);
+ }
+ if (rc == BKException.Code.OK) {
+ numSuccess.incrementAndGet();
+ }
+ latch.countDown();
+ }
+
+ long await() throws InterruptedException {
+ if (latch.await(60, TimeUnit.SECONDS) == false) {
+ LOG.warn("Didn't get all responses in verification");
+ return 0;
+ } else {
+ return numSuccess.get();
+ }
+ }
+ }
+
+ private boolean verifyFullyReplicated(LedgerHandle lh, long untilEntry) throws Exception {
+ String znodepath = StringUtils.getLedgerNodePath(lh.getId());
+ Stat stat = bkc.getZkHandle().exists(znodepath, false);
+ assertNotNull(stat);
+ byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat);
+ LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion());
+
+ Map<Long, ArrayList<InetSocketAddress>> ensembles = md.getEnsembles();
+
+ HashMap<Long, Long> ranges = new HashMap<Long, Long>();
+ ArrayList<Long> keyList = Collections.list(
+ Collections.enumeration(ensembles.keySet()));
+ Collections.sort(keyList);
+ for (int i = 0; i < keyList.size() - 1; i++) {
+ ranges.put(keyList.get(i), keyList.get(i+1));
+ }
+ ranges.put(keyList.get(keyList.size()-1), untilEntry);
+
+ for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : ensembles.entrySet()) {
+ int quorum = md.quorumSize;
+ long startEntryId = e.getKey();
+ long endEntryId = ranges.get(startEntryId);
+ long expectedSuccess = quorum*(endEntryId-startEntryId);
+ int numRequests = e.getValue().size()*((int)(endEntryId-startEntryId));
+
+ ReplicationVerificationCallback cb = new ReplicationVerificationCallback(numRequests);
+ for (long i = startEntryId; i < endEntryId; i++) {
+ for (InetSocketAddress addr : e.getValue()) {
+ bkc.bookieClient.readEntry(addr, lh.getId(), i, cb, addr);
+ }
+ }
+
+ long numSuccess = cb.await();
+ if (numSuccess < expectedSuccess) {
+ LOG.warn("Fragment not fully replicated ledgerId = " + lh.getId()
+ + " startEntryId = " + startEntryId
+ + " endEntryId = " + endEntryId
+ + " expectedSuccess = " + expectedSuccess
+ + " gotSuccess = " + numSuccess);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean findDupesInEnsembles(List<LedgerHandle> lhs) throws Exception {
+ long numDupes = 0;
+ for (LedgerHandle lh : lhs) {
+ String znodepath = StringUtils.getLedgerNodePath(lh.getId());
+ Stat stat = bkc.getZkHandle().exists(znodepath, false);
+ assertNotNull(stat);
+ byte[] mdbytes = bkc.getZkHandle().getData(znodepath, false, stat);
+ LedgerMetadata md = LedgerMetadata.parseConfig(mdbytes, stat.getVersion());
+
+ for (Map.Entry<Long, ArrayList<InetSocketAddress>> e : md.getEnsembles().entrySet()) {
+ HashSet<InetSocketAddress> set = new HashSet<InetSocketAddress>();
+ long fragment = e.getKey();
+
+ for (InetSocketAddress addr : e.getValue()) {
+ if (set.contains(addr)) {
+ LOG.error("Dupe " + addr + " found in ensemble for fragment " + fragment
+ + " of ledger " + lh.getId());
+ numDupes++;
+ }
+ set.add(addr);
+ }
+ }
+ }
+ return numDupes > 0;
+ }
+
+ @Test
+ public void testAsyncBookieRecoveryToRandomBookiesNotEnoughBookies() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Call the async recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ // Initiate the sync object
+ sync.value = false;
+ try {
+ bkAdmin.recoverBookieData(bookieSrc, null);
+ fail("Should have thrown exception");
+ } catch (BKException.BKLedgerRecoveryException bke) {
+ // correct behaviour
+ }
+ }
+
+ @Test
+ public void testSyncBookieRecoveryToRandomBookiesCheckForDupes() throws Exception {
+ Random r = new Random();
+ for (int i = 0; i < 10; i++) {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers, numBookies, 2);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 100;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ int removeIndex = r.nextInt(bs.size());
+ InetSocketAddress bookieSrc = bs.get(removeIndex).getLocalAddress();
+ bs.get(removeIndex).shutdown();
+ bs.remove(removeIndex);
+
+ // Startup three new bookie servers
+ int newBookiePort = initialPort + numBookies + i;
+ startNewBookie(newBookiePort);
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, numMsgs, lhs);
+
+ // Call the async recover bookie method.
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ // Initiate the sync object
+ sync.value = false;
+ bkAdmin.recoverBookieData(bookieSrc, null);
+ assertFalse("Dupes exist in ensembles", findDupesInEnsembles(lhs));
+
+ // Write some more entries to ensure fencing hasn't broken stuff
+ writeEntriestoLedgers(numMsgs, numMsgs*2, lhs);
+ for (LedgerHandle lh : lhs) {
+ assertTrue("Not fully replicated", verifyFullyReplicated(lh, numMsgs*3));
+ // TODO (BOOKKEEPER-112) this throws an exception at the moment
+ // because recovering a ledger updates the ledger znode
+ //lh.close();
+ }
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java?rev=1202370&r1=1202369&r2=1202370&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BaseTestCase.java Tue Nov 15 18:42:58 2011
@@ -53,19 +53,19 @@ import junit.framework.TestCase;
public abstract class BaseTestCase extends TestCase {
static final Logger LOG = Logger.getLogger(BaseTestCase.class);
// ZooKeeper related variables
- static final String HOSTPORT = "127.0.0.1:2181";
- static Integer ZooKeeperDefaultPort = 2181;
- ZooKeeperServer zks;
- ZooKeeper zkc; // zookeeper client
- NIOServerCnxnFactory serverFactory;
- File ZkTmpDir;
+ protected static final String HOSTPORT = "127.0.0.1:2181";
+ protected static Integer ZooKeeperDefaultPort = 2181;
+ protected ZooKeeperServer zks;
+ protected ZooKeeper zkc; // zookeeper client
+ protected NIOServerCnxnFactory serverFactory;
+ protected File ZkTmpDir;
// BookKeeper
- List<File> tmpDirs = new ArrayList<File>();
- List<BookieServer> bs = new ArrayList<BookieServer>();
- Integer initialPort = 5000;
- int numBookies;
- BookKeeperTestClient bkc;
+ protected List<File> tmpDirs = new ArrayList<File>();
+ protected List<BookieServer> bs = new ArrayList<BookieServer>();
+ protected Integer initialPort = 5000;
+ protected int numBookies;
+ protected BookKeeperTestClient bkc;
public BaseTestCase(int numBookies) {
this.numBookies = numBookies;