You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2010/07/09 23:18:00 UTC
svn commit: r962697 - in /hadoop/zookeeper/trunk: ./
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/
src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/
src/contrib/bookkeeper/test/org/apache/bookkeeper/test/
Author: breed
Date: Fri Jul 9 21:17:57 2010
New Revision: 962697
URL: http://svn.apache.org/viewvc?rev=962697&view=rev
Log:
ZOOKEEPER-712. Bookie recovery.
Added:
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Jul 9 21:17:57 2010
@@ -99,6 +99,8 @@ NEW FEATURES:
ZOOKEEPER-744. Add monitoring four-letter word (Savu Andrei via phunt)
+ ZOOKEEPER-712. Bookie recovery. (erwin tam via breed)
+
Release 3.3.0 - 2010-03-24
Non-backward compatible changes:
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/AsyncCallback.java Fri Jul 9 21:17:57 2010
@@ -111,4 +111,16 @@ public interface AsyncCallback {
void deleteComplete(int rc, Object ctx);
}
+ public interface RecoverCallback {
+ /**
+ * Callback definition for bookie recover operations
+ *
+ * @param rc
+ * return code
+ * @param ctx
+ * control object
+ */
+ void recoverComplete(int rc, Object ctx);
+ }
+
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/BookKeeper.java Fri Jul 9 21:17:57 2010
@@ -159,6 +159,15 @@ public class BookKeeper implements OpenC
}
/**
+ * Get the BookieClient, currently used for doing bookie recovery.
+ *
+ * @return BookieClient for the BookKeeper instance.
+ */
+ public BookieClient getBookieClient() {
+ return bookieClient;
+ }
+
+ /**
* Creates a new ledger asynchronously. To create a ledger, we need to specify
* the ensemble size, the quorum size, the digest type, a password, a callback
* implementation, and an optional control object. The ensemble size is how
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DigestManager.java Fri Jul 9 21:17:57 2010
@@ -35,7 +35,7 @@ import org.jboss.netty.buffer.ChannelBuf
* for the packet. Currently 2 types of digests are supported: MAC (based on SHA-1) and CRC32
*/
-abstract class DigestManager {
+public abstract class DigestManager {
static final Logger logger = Logger.getLogger(DigestManager.class);
long ledgerId;
@@ -67,7 +67,7 @@ abstract class DigestManager {
}
}
- ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
+ public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, byte[] data) {
byte[] bufferArray = new byte[24+macCodeLength];
ByteBuffer buffer = ByteBuffer.wrap(bufferArray);
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/DistributionSchedule.java Fri Jul 9 21:17:57 2010
@@ -28,7 +28,7 @@ package org.apache.bookkeeper.client;
* to.
*/
-interface DistributionSchedule {
+public interface DistributionSchedule {
/**
*
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Jul 9 21:17:57 2010
@@ -121,7 +121,43 @@ public class LedgerHandle implements Rea
return lastAddPushed;
}
- void writeLedgerConfig(StatCallback callback, Object ctx) {
+ /**
+ * Get the Ledger's key/password.
+ *
+ * @return byte array for the ledger's key/password.
+ */
+ public byte[] getLedgerKey() {
+ return ledgerKey;
+ }
+
+ /**
+ * Get the LedgerMetadata
+ *
+ * @return LedgerMetadata for the LedgerHandle
+ */
+ public LedgerMetadata getLedgerMetadata() {
+ return metadata;
+ }
+
+ /**
+ * Get the DigestManager
+ *
+ * @return DigestManager for the LedgerHandle
+ */
+ public DigestManager getDigestManager() {
+ return macManager;
+ }
+
+ /**
+ * Get the Distribution Schedule
+ *
+ * @return DistributionSchedule for the LedgerHandle
+ */
+ public DistributionSchedule getDistributionSchedule() {
+ return distributionSchedule;
+ }
+
+ public void writeLedgerConfig(StatCallback callback, Object ctx) {
bk.getZkHandle().setData(StringUtils.getLedgerNodePath(ledgerId),
metadata.serialize(), -1, callback, ctx);
}
Modified: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=962697&r1=962696&r2=962697&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Jul 9 21:17:57 2010
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
* in zookeeper. It provides parsing and serialization methods of such metadata.
*
*/
-class LedgerMetadata {
+public class LedgerMetadata {
static final Logger LOG = Logger.getLogger(LedgerMetadata.class);
private static final String closed = "CLOSED";
@@ -59,6 +59,17 @@ class LedgerMetadata {
this(0, 0);
}
+ /**
+ * Get the Map of bookie ensembles for the various ledger fragments
+ * that make up the ledger.
+ *
+ * @return SortedMap of Ledger Fragments and the corresponding
+ * bookie ensembles that store the entries.
+ */
+ public SortedMap<Long, ArrayList<InetSocketAddress>> getEnsembles() {
+ return ensembles;
+ }
+
boolean isClosed() {
return close != NOTCLOSED;
}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java?rev=962697&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/src/java/org/apache/bookkeeper/tools/BookKeeperTools.java Fri Jul 9 21:17:57 2010
@@ -0,0 +1,762 @@
+package org.apache.bookkeeper.tools;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+/**
+ * Provides Admin Tools to manage the BookKeeper cluster.
+ *
+ */
+public class BookKeeperTools {
+
+ private static Logger LOG = Logger.getLogger(BookKeeperTools.class);
+
+ // ZK client instance
+ private ZooKeeper zk;
+ // ZK ledgers related String constants
+ static final String LEDGERS_PATH = "/ledgers";
+ static final String LEDGER_NODE_PREFIX = "L";
+ static final String AVAILABLE_NODE = "available";
+ static final String BOOKIES_PATH = LEDGERS_PATH + "/" + AVAILABLE_NODE;
+ static final String COLON = ":";
+
+ // BookKeeper client instance
+ private BookKeeper bkc;
+
+ /*
+ * Random number generator used to choose an available bookie server to
+ * replicate data from a dead bookie.
+ */
+ private Random rand = new Random();
+
+ /*
+ * For now, assume that all ledgers were created with the same DigestType
+ * and password. In the future, this admin tool will need to know for each
+ * ledger, what was the DigestType and password used to create it before it
+ * can open it. These values will come from System properties, though hard
+ * coded defaults are defined here.
+ */
+ private DigestType DIGEST_TYPE = DigestType.valueOf(System.getProperty("digestType", DigestType.CRC32.toString()));
+ private byte[] PASSWD = System.getProperty("passwd", "").getBytes();
+
+ /**
+ * Constructor that takes in a ZooKeeper servers connect string so we know
+ * how to connect to ZooKeeper to retrieve information about the BookKeeper
+ * cluster. We need this before we can do any type of admin operations on
+ * the BookKeeper cluster.
+ *
+ * @param zkServers
+ * Comma separated list of hostname:port pairs for the ZooKeeper
+ * servers cluster.
+ * @throws IOException
+ * Throws this exception if there is an error instantiating the
+ * ZooKeeper client.
+ * @throws InterruptedException
+ * Throws this exception if there is an error instantiating the
+ * BookKeeper client.
+ * @throws KeeperException
+ * Throws this exception if there is an error instantiating the
+ * BookKeeper client.
+ */
+ public BookKeeperTools(String zkServers) throws IOException, InterruptedException, KeeperException {
+ // Create the ZooKeeper client instance
+ zk = new ZooKeeper(zkServers, 10000, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process: " + event.getType() + " " + event.getPath());
+ }
+ }
+ });
+ // Create the BookKeeper client instance
+ bkc = new BookKeeper(zk);
+ }
+
+ /**
+ * Shutdown method to gracefully release resources that this class uses.
+ *
+ * @throws InterruptedException
+ * if there is an error shutting down the clients that this
+ * class uses.
+ */
+ public void shutdown() throws InterruptedException {
+ bkc.halt();
+ zk.close();
+ }
+
+ /**
+ * This is a multi callback object for bookie recovery that waits for all of
+ * the multiple async operations to complete. If any fail, then we invoke
+ * the final callback with a BK LedgerRecoveryException.
+ */
+ class MultiCallback implements AsyncCallback.VoidCallback {
+ // Number of expected callbacks
+ final int expected;
+ // Final callback and the corresponding context to invoke
+ final AsyncCallback.VoidCallback cb;
+ final Object context;
+ // This keeps track of how many operations have completed
+ final AtomicInteger done = new AtomicInteger();
+ // List of the exceptions from operations that completed unsuccessfully
+ final LinkedBlockingQueue<Integer> exceptions = new LinkedBlockingQueue<Integer>();
+
+ MultiCallback(int expected, AsyncCallback.VoidCallback cb, Object context) {
+ this.expected = expected;
+ this.cb = cb;
+ this.context = context;
+ if (expected == 0) {
+ cb.processResult(Code.OK.intValue(), null, context);
+ }
+ }
+
+ private void tick() {
+ if (done.incrementAndGet() == expected) {
+ if (exceptions.isEmpty()) {
+ cb.processResult(Code.OK.intValue(), null, context);
+ } else {
+ cb.processResult(BKException.Code.LedgerRecoveryException, null, context);
+ }
+ }
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error recovering ledger data", BKException.create(rc));
+ exceptions.add(rc);
+ }
+ tick();
+ }
+
+ }
+
+ /**
+ * Method to get the input ledger's digest type. For now, this is just a
+ * placeholder function since there is no way we can get this information
+ * easily. In the future, BookKeeper should store this ledger metadata
+ * somewhere such that an admin tool can access it.
+ *
+ * @param ledgerId
+ * LedgerId we are retrieving the digestType for.
+ * @return DigestType for the input ledger
+ */
+ private DigestType getLedgerDigestType(long ledgerId) {
+ return DIGEST_TYPE;
+ }
+
+ /**
+ * Method to get the input ledger's password. For now, this is just a
+ * placeholder function since there is no way we can get this information
+ * easily. In the future, BookKeeper should store this ledger metadata
+ * somewhere such that an admin tool can access it.
+ *
+ * @param ledgerId
+ * LedgerId we are retrieving the password for.
+ * @return Password for the input ledger
+ */
+ private byte[] getLedgerPasswd(long ledgerId) {
+ return PASSWD;
+ }
+
+ // Object used for calling async methods and waiting for them to complete.
+ class SyncObject {
+ boolean value;
+
+ public SyncObject() {
+ value = false;
+ }
+ }
+
+ /**
+ * Synchronous method to rebuild and recover the ledger fragments data that
+ * was stored on the source bookie. That bookie could have failed completely
+ * and now the ledger data that was stored on it is under replicated. An
+ * optional destination bookie server could be given if we want to copy all
+ * of the ledger fragments data on the failed source bookie to it.
+ * Otherwise, we will just randomly distribute the ledger fragments to the
+ * active set of bookies, perhaps based on load. All ZooKeeper ledger
+ * metadata will be updated to point to the new bookie(s) that contain the
+ * replicated ledger fragments.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param bookieDest
+ * Optional destination bookie that if passed, we will copy all
+ * of the ledger fragments from the source bookie over to it.
+ */
+ public void recoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest)
+ throws InterruptedException {
+ SyncObject sync = new SyncObject();
+ // Call the async method to recover bookie data.
+ asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
+ @Override
+ public void recoverComplete(int rc, Object ctx) {
+ LOG.info("Recover bookie operation completed with rc: " + rc);
+ SyncObject syncObj = (SyncObject) ctx;
+ synchronized (syncObj) {
+ syncObj.value = true;
+ syncObj.notify();
+ }
+ }
+ }, sync);
+
+ // Wait for the async method to complete.
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+ }
+
+ /**
+ * Async method to rebuild and recover the ledger fragments data that was
+ * stored on the source bookie. That bookie could have failed completely and
+ * now the ledger data that was stored on it is under replicated. An
+ * optional destination bookie server could be given if we want to copy all
+ * of the ledger fragments data on the failed source bookie to it.
+ * Otherwise, we will just randomly distribute the ledger fragments to the
+ * active set of bookies, perhaps based on load. All ZooKeeper ledger
+ * metadata will be updated to point to the new bookie(s) that contain the
+ * replicated ledger fragments.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param bookieDest
+ * Optional destination bookie that if passed, we will copy all
+ * of the ledger fragments from the source bookie over to it.
+ * @param cb
+ * RecoverCallback to invoke once all of the data on the dead
+ * bookie has been recovered and replicated.
+ * @param context
+ * Context for the RecoverCallback to call.
+ */
+ public void asyncRecoverBookieData(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+ final RecoverCallback cb, final Object context) {
+ // Sync ZK to make sure we're reading the latest bookie/ledger data.
+ zk.sync(LEDGERS_PATH, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path));
+ cb.recoverComplete(BKException.Code.ZKException, context);
+ return;
+ }
+ getAvailableBookies(bookieSrc, bookieDest, cb, context);
+ };
+ }, null);
+ }
+
+ /**
+ * This method asynchronously gets the set of available Bookies that the
+ * dead input bookie's data will be copied over into. If the user passed in
+ * a specific destination bookie, then just use that one. Otherwise, we'll
+ * randomly pick one of the other available bookies to use for each ledger
+ * fragment we are replicating.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param bookieDest
+ * Optional destination bookie that if passed, we will copy all
+ * of the ledger fragments from the source bookie over to it.
+ * @param cb
+ * RecoverCallback to invoke once all of the data on the dead
+ * bookie has been recovered and replicated.
+ * @param context
+ * Context for the RecoverCallback to call.
+ */
+ private void getAvailableBookies(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+ final RecoverCallback cb, final Object context) {
+ final List<InetSocketAddress> availableBookies = new LinkedList<InetSocketAddress>();
+ if (bookieDest != null) {
+ availableBookies.add(bookieDest);
+ // Now poll ZK to get the active ledgers
+ getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
+ } else {
+ zk.getChildren(BOOKIES_PATH, null, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String> children) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
+ .get(rc), path));
+ cb.recoverComplete(BKException.Code.ZKException, context);
+ return;
+ }
+ for (String bookieNode : children) {
+ String parts[] = bookieNode.split(COLON);
+ if (parts.length < 2) {
+ LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
+ cb.recoverComplete(BKException.Code.ZKException, context);
+ return;
+ }
+ availableBookies.add(new InetSocketAddress(parts[0], Integer.parseInt(parts[1])));
+ }
+ // Now poll ZK to get the active ledgers
+ getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
+ }
+ }, null);
+ }
+ }
+
+ /**
+ * This method asynchronously polls ZK to get the current set of active
+ * ledgers. From this, we can open each ledger and look at the metadata to
+ * determine if any of the ledger fragments for it were stored at the dead
+ * input bookie.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param bookieDest
+ * Optional destination bookie that if passed, we will copy all
+ * of the ledger fragments from the source bookie over to it.
+ * @param cb
+ * RecoverCallback to invoke once all of the data on the dead
+ * bookie has been recovered and replicated.
+ * @param context
+ * Context for the RecoverCallback to call.
+ * @param availableBookies
+ * List of Bookie Servers that are available to use for
+ * replicating data on the failed bookie. This could contain a
+ * single bookie server if the user explicitly chose a bookie
+ * server to replicate data to.
+ */
+ private void getActiveLedgers(final InetSocketAddress bookieSrc, final InetSocketAddress bookieDest,
+ final RecoverCallback cb, final Object context, final List<InetSocketAddress> availableBookies) {
+ zk.getChildren(LEDGERS_PATH, null, new AsyncCallback.ChildrenCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, List<String> children) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error getting ledger nodes: ", KeeperException.create(KeeperException.Code.get(rc),
+ path));
+ cb.recoverComplete(BKException.Code.ZKException, context);
+ return;
+ }
+ // Wrapper class around the RecoverCallback so it can be used
+ // as the final VoidCallback to invoke within the MultiCallback.
+ class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
+ final RecoverCallback cb;
+
+ RecoverCallbackWrapper(RecoverCallback cb) {
+ this.cb = cb;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ cb.recoverComplete(rc, ctx);
+ }
+ }
+ // Recover each of the ledgers asynchronously
+ MultiCallback ledgerMcb = new MultiCallback(children.size(), new RecoverCallbackWrapper(cb), context);
+ for (final String ledgerNode : children) {
+ recoverLedger(bookieSrc, ledgerNode, ledgerMcb, availableBookies);
+ }
+ }
+ }, null);
+ }
+
+ /**
+ * This method asynchronously recovers a given ledger if any of the ledger
+ * entries were stored on the failed bookie.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param ledgerNode
+ * Ledger Node name as retrieved from ZooKeeper we want to
+ * recover.
+ * @param ledgerMcb
+ * MultiCallback to invoke once we've recovered the current
+ * ledger.
+ * @param availableBookies
+ * List of Bookie Servers that are available to use for
+ * replicating data on the failed bookie. This could contain a
+ * single bookie server if the user explicitly chose a bookie
+ * server to replicate data to.
+ */
+ private void recoverLedger(final InetSocketAddress bookieSrc, final String ledgerNode,
+ final MultiCallback ledgerMcb, final List<InetSocketAddress> availableBookies) {
+ /*
+ * The available node is also stored in this path so ignore that. That
+ * node is the path for the set of available Bookie Servers.
+ */
+ if (ledgerNode.equals(AVAILABLE_NODE)) {
+ ledgerMcb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
+ // Parse out the ledgerId from the ZK ledger node.
+ String parts[] = ledgerNode.split(LEDGER_NODE_PREFIX);
+ if (parts.length < 2) {
+ LOG.error("Ledger Node retrieved from ZK has invalid name format: " + ledgerNode);
+ ledgerMcb.processResult(BKException.Code.ZKException, null, null);
+ return;
+ }
+ final long lId;
+ try {
+ lId = Long.parseLong(parts[parts.length - 1]);
+ } catch (NumberFormatException e) {
+ LOG.error("Error retrieving ledgerId from ledgerNode: " + ledgerNode, e);
+ ledgerMcb.processResult(BKException.Code.ZKException, null, null);
+ return;
+ }
+ /*
+ * For the current ledger, open it to retrieve the LedgerHandle. This
+ * will contain the LedgerMetadata indicating which bookie servers the
+ * ledger fragments are stored on. Check if any of the ledger fragments
+ * for the current ledger are stored on the input dead bookie.
+ */
+ DigestType digestType = getLedgerDigestType(lId);
+ byte[] passwd = getLedgerPasswd(lId);
+ bkc.asyncOpenLedger(lId, digestType, passwd, new OpenCallback() {
+ @Override
+ public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
+ ledgerMcb.processResult(rc, null, null);
+ return;
+ }
+ /*
+ * This List stores the ledger fragments to recover indexed by
+ * the start entry ID for the range. The ensembles TreeMap is
+ * keyed off this.
+ */
+ final List<Long> ledgerFragmentsToRecover = new LinkedList<Long>();
+ /*
+ * This Map will store the start and end entry ID values for
+ * each of the ledger fragment ranges. The only exception is the
+ * current active fragment since it has no end yet. In the event
+ * of a bookie failure, a new ensemble is created so the current
+ * ensemble should not contain the dead bookie we are trying to
+ * recover.
+ */
+ Map<Long, Long> ledgerFragmentsRange = new HashMap<Long, Long>();
+ Long curEntryId = null;
+ for (Map.Entry<Long, ArrayList<InetSocketAddress>> entry : lh.getLedgerMetadata().getEnsembles()
+ .entrySet()) {
+ if (curEntryId != null)
+ ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
+ curEntryId = entry.getKey();
+ if (entry.getValue().contains(bookieSrc)) {
+ /*
+ * Current ledger fragment has entries stored on the
+ * dead bookie so we'll need to recover them.
+ */
+ ledgerFragmentsToRecover.add(entry.getKey());
+ }
+ }
+ /*
+ * See if this current ledger contains any ledger fragment that
+ * needs to be re-replicated. If not, then just invoke the
+ * multiCallback and return.
+ */
+ if (ledgerFragmentsToRecover.size() == 0) {
+ ledgerMcb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
+ /*
+ * We have ledger fragments that need to be re-replicated to a
+ * new bookie. Choose one randomly from the available set of
+ * bookies.
+ */
+ final InetSocketAddress newBookie = availableBookies.get(rand.nextInt(availableBookies.size()));
+
+ /*
+ * Wrapper class around the ledger MultiCallback. Once all
+ * ledger fragments for the ledger have been replicated to a new
+ * bookie, we need to update ZK with this new metadata to point
+ * to the new bookie instead of the old dead one. That should be
+ * done at the end prior to invoking the ledger MultiCallback.
+ */
+ class LedgerMultiCallbackWrapper implements AsyncCallback.VoidCallback {
+ final MultiCallback ledgerMcb;
+
+ LedgerMultiCallbackWrapper(MultiCallback ledgerMcb) {
+ this.ledgerMcb = ledgerMcb;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error replicating ledger fragments for ledger: " + lId, BKException
+ .create(rc));
+ ledgerMcb.processResult(rc, null, null);
+ return;
+ }
+ /*
+ * Update the ledger metadata's ensemble info to point
+ * to the new bookie.
+ */
+ for (final Long startEntryId : ledgerFragmentsToRecover) {
+ ArrayList<InetSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(
+ startEntryId);
+ int deadBookieIndex = ensemble.indexOf(bookieSrc);
+ ensemble.remove(deadBookieIndex);
+ ensemble.add(deadBookieIndex, newBookie);
+ }
+ lh.writeLedgerConfig(new AsyncCallback.StatCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("ZK error updating ledger config metadata for ledgerId: " + lh.getId(),
+ KeeperException.create(KeeperException.Code.get(rc), path));
+ } else {
+ LOG.info("Updated ZK for ledgerId: (" + lh.getId()
+ + ") to point ledger fragments from old dead bookie: (" + bookieSrc
+ + ") to new bookie: (" + newBookie + ")");
+ }
+ /*
+ * Pass the return code result up the chain with
+ * the parent callback.
+ */
+ ledgerMcb.processResult(rc, null, null);
+ }
+ }, null);
+ }
+ }
+
+ /*
+ * Now recover all of the necessary ledger fragments
+ * asynchronously using a MultiCallback for every fragment.
+ */
+ MultiCallback ledgerFragmentMcb = new MultiCallback(ledgerFragmentsToRecover.size(),
+ new LedgerMultiCallbackWrapper(ledgerMcb), null);
+ for (final Long startEntryId : ledgerFragmentsToRecover) {
+ Long endEntryId = ledgerFragmentsRange.get(startEntryId);
+ try {
+ recoverLedgerFragment(bookieSrc, lh, startEntryId, endEntryId, ledgerFragmentMcb, newBookie);
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }, null);
+ }
+
+ /**
+ * This method asynchronously recovers a ledger fragment which is a
+ * contiguous portion of a ledger that was stored in an ensemble that
+ * included the failed bookie.
+ *
+ * @param bookieSrc
+ * Source bookie that had a failure. We want to replicate the
+ * ledger fragments that were stored there.
+ * @param lh
+ * LedgerHandle for the ledger
+ * @param startEntryId
+ * Start entry Id for the ledger fragment
+ * @param endEntryId
+ * End entry Id for the ledger fragment
+ * @param ledgerFragmentMcb
+ * MultiCallback to invoke once we've recovered the current
+ * ledger fragment.
+ * @param newBookie
+ * New bookie we want to use to recover and replicate the ledger
+ * entries that were stored on the failed bookie.
+ */
+ private void recoverLedgerFragment(final InetSocketAddress bookieSrc, final LedgerHandle lh,
+ final Long startEntryId, final Long endEntryId, final MultiCallback ledgerFragmentMcb,
+ final InetSocketAddress newBookie) throws InterruptedException {
+ if (endEntryId == null) {
+ /*
+ * Ideally this should never happen if bookie failure is taken care
+ * of properly. Nothing we can do though in this case.
+ */
+ LOG.warn("Dead bookie (" + bookieSrc + ") is still part of the current active ensemble for ledgerId: "
+ + lh.getId());
+ ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
+
+ ArrayList<InetSocketAddress> curEnsemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+ int bookieIndex = 0;
+ for (int i = 0; i < curEnsemble.size(); i++) {
+ if (curEnsemble.get(i).equals(bookieSrc)) {
+ bookieIndex = i;
+ break;
+ }
+ }
+ /*
+ * Loop through all entries in the current ledger fragment range and
+ * find the ones that were stored on the dead bookie.
+ */
+ List<Long> entriesToReplicate = new LinkedList<Long>();
+ for (long i = startEntryId; i <= endEntryId; i++) {
+ if (lh.getDistributionSchedule().getReplicaIndex(i, bookieIndex) >= 0) {
+ /*
+ * Current entry is stored on the dead bookie so we'll need to
+ * read it and replicate it to a new bookie.
+ */
+ entriesToReplicate.add(i);
+ }
+ }
+ /*
+ * Now asynchronously replicate all of the entries for the ledger
+ * fragment that were on the dead bookie.
+ */
+ MultiCallback ledgerFragmentEntryMcb = new MultiCallback(entriesToReplicate.size(), ledgerFragmentMcb, null);
+ for (final Long entryId : entriesToReplicate) {
+ recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, newBookie);
+ }
+ }
+
+ /**
+ * This method asynchronously recovers a specific ledger entry by reading
+ * the values via the BookKeeper Client (which would read it from the other
+ * replicas) and then writing it to the chosen new bookie.
+ *
+ * @param entryId
+ * Ledger Entry ID to recover.
+ * @param lh
+ * LedgerHandle for the ledger
+ * @param ledgerFragmentEntryMcb
+ * MultiCallback to invoke once we've recovered the current
+ * ledger entry.
+ * @param newBookie
+ * New bookie we want to use to recover and replicate the ledger
+ * entries that were stored on the failed bookie.
+ */
+ private void recoverLedgerFragmentEntry(final Long entryId, final LedgerHandle lh,
+ final MultiCallback ledgerFragmentEntryMcb, final InetSocketAddress newBookie) throws InterruptedException {
+ /*
+ * Read the ledger entry using the LedgerHandle. This will allow us to
+ * read the entry from one of the other replicated bookies other than
+ * the dead one.
+ */
+ lh.asyncReadEntries(entryId, entryId, new ReadCallback() {
+ @Override
+ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error reading ledger entry: " + entryId, BKException.create(rc));
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ return;
+ }
+ /*
+ * Now that we've read the ledger entry, write it to the new
+ * bookie we've selected.
+ */
+ ChannelBuffer toSend = lh.getDigestManager().computeDigestAndPackageForSending(entryId,
+ lh.getLastAddConfirmed(), seq.nextElement().getEntry());
+ bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend,
+ new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr,
+ Object ctx) {
+ if (rc != Code.OK.intValue()) {
+ LOG.error("BK error writing entry for ledgerId: " + ledgerId + ", entryId: "
+ + entryId + ", bookie: " + addr, BKException.create(rc));
+ } else {
+ LOG.debug("Success writing ledger entry to a new bookie!");
+ }
+ /*
+ * Pass the return code result up the chain with
+ * the parent callback.
+ */
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ }
+ }, null);
+ }
+ }, null);
+ }
+
+ /**
+ * Main method so we can invoke the bookie recovery via command line.
+ *
+ * @param args
+ * Arguments to BookKeeperTools. 2 are required and the third is
+ * optional. The first is a comma separated list of ZK server
+ * host:port pairs. The second is the host:port socket address
+ * for the bookie we are trying to recover. The third is the
+ * host:port socket address of the optional destination bookie
+ * server we want to replicate the data over to.
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws KeeperException
+ */
+ public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
+ // Validate the inputs
+ if (args.length < 2) {
+ System.err.println("USAGE: BookKeeperTools zkServers bookieSrc [bookieDest]");
+ return;
+ }
+ // Parse out the input arguments
+ String zkServers = args[0];
+ String bookieSrcString[] = args[1].split(COLON);
+ if (bookieSrcString.length < 2) {
+ System.err.println("BookieSrc inputted has invalid name format (host:port expected): " + bookieSrcString);
+ return;
+ }
+ final InetSocketAddress bookieSrc = new InetSocketAddress(bookieSrcString[0], Integer
+ .parseInt(bookieSrcString[1]));
+ InetSocketAddress bookieDest = null;
+ if (args.length < 3) {
+ String bookieDestString[] = args[2].split(COLON);
+ if (bookieDestString.length < 2) {
+ System.err.println("BookieDest inputted has invalid name format (host:port expected): "
+ + bookieDestString);
+ return;
+ }
+ bookieDest = new InetSocketAddress(bookieDestString[0], Integer.parseInt(bookieDestString[1]));
+ }
+
+ // Create the BookKeeperTools instance and perform the bookie recovery
+ // synchronously.
+ BookKeeperTools bkTools = new BookKeeperTools(zkServers);
+ bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+ // Shutdown the resources used in the BookKeeperTools instance.
+ bkTools.shutdown();
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java?rev=962697&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/bookkeeper/test/org/apache/bookkeeper/test/BookieRecoveryTest.java Fri Jul 9 21:17:57 2010
@@ -0,0 +1,396 @@
+package org.apache.bookkeeper.test;
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerEntry;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.tools.BookKeeperTools;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests the bookie recovery admin functionality.
+ */
+public class BookieRecoveryTest extends BaseTestCase {
+ static Logger LOG = Logger.getLogger(BookieRecoveryTest.class);
+
+ // Object used for synchronizing async method calls
+ class SyncObject {
+ boolean value;
+
+ public SyncObject() {
+ value = false;
+ }
+ }
+
+ // Object used for implementing the Bookie RecoverCallback for this jUnit
+ // test. This verifies that the operation completed successfully.
+ class BookieRecoverCallback implements RecoverCallback {
+ @Override
+ public void recoverComplete(int rc, Object ctx) {
+ LOG.info("Recovered bookie operation completed with rc: " + rc);
+ assertTrue(rc == Code.OK.intValue());
+ SyncObject sync = (SyncObject) ctx;
+ synchronized (sync) {
+ sync.value = true;
+ sync.notify();
+ }
+ }
+ }
+
+ // Objects to use for this jUnit test.
+ DigestType digestType;
+ SyncObject sync;
+ BookieRecoverCallback bookieRecoverCb;
+ BookKeeperTools bkTools;
+
+ // Constructor
+ public BookieRecoveryTest(DigestType digestType) {
+ super(3);
+ this.digestType = digestType;
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ // Set up the configuration properties needed.
+ System.setProperty("digestType", digestType.toString());
+ System.setProperty("passwd", "");
+ sync = new SyncObject();
+ bookieRecoverCb = new BookieRecoverCallback();
+ bkTools = new BookKeeperTools(HOSTPORT);
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ // Release any resources used by the BookKeeperTools instance.
+ bkTools.shutdown();
+ super.tearDown();
+ }
+
+ /**
+ * Helper method to create a number of ledgers
+ *
+ * @param numLedgers
+ * Number of ledgers to create
+ * @return List of LedgerHandles for each of the ledgers created
+ * @throws BKException
+ * @throws KeeperException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private List<LedgerHandle> createLedgers(int numLedgers) throws BKException, KeeperException, IOException,
+ InterruptedException {
+ List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+ for (int i = 0; i < numLedgers; i++) {
+ lhs.add(bkc.createLedger(digestType, System.getProperty("passwd").getBytes()));
+ }
+ return lhs;
+ }
+
+ /**
+ * Helper method to write dummy ledger entries to all of the ledgers passed.
+ *
+ * @param numEntries
+ * Number of ledger entries to write for each ledger
+ * @param startEntryId
+ * The first entry Id we're expecting to write for each ledger
+ * @param lhs
+ * List of LedgerHandles for all ledgers to write entries to
+ * @throws BKException
+ * @throws InterruptedException
+ */
+ private void writeEntriestoLedgers(int numEntries, long startEntryId, List<LedgerHandle> lhs) throws BKException,
+ InterruptedException {
+ for (LedgerHandle lh : lhs) {
+ for (int i = 0; i < numEntries; i++) {
+ lh.addEntry(("LedgerId: " + lh.getId() + ", EntryId: " + (startEntryId + i)).getBytes());
+ }
+ }
+ }
+
+ /**
+ * Helper method to startup a new bookie server with the indicated port
+ * number
+ *
+ * @param port
+ * Port to start the new bookie server on
+ * @throws IOException
+ */
+ private void startNewBookie(int port) throws IOException {
+ File f = File.createTempFile("bookie", "test");
+ tmpDirs.add(f);
+ f.delete();
+ f.mkdir();
+ BookieServer server = new BookieServer(port, HOSTPORT, f, new File[] { f });
+ server.start();
+ bs.add(server);
+ LOG.info("New bookie on port " + port + " has been created.");
+ }
+
+ /**
+ * Helper method to verify that we can read the recovered ledger entries.
+ *
+ * @param numLedgers
+ * Number of ledgers to verify
+ * @param startEntryId
+ * Start Entry Id to read
+ * @param endEntryId
+ * End Entry Id to read
+ * @throws BKException
+ * @throws InterruptedException
+ */
+ private void verifyRecoveredLedgers(int numLedgers, long startEntryId, long endEntryId) throws BKException,
+ InterruptedException {
+ // Get a set of LedgerHandles for all of the ledgers to verify
+ List<LedgerHandle> lhs = new ArrayList<LedgerHandle>();
+ for (int i = 0; i < numLedgers; i++) {
+ lhs.add(bkc.openLedger(i + 1, digestType, System.getProperty("passwd").getBytes()));
+ }
+ // Read the ledger entries to verify that they are all present and
+ // correct in the new bookie.
+ for (LedgerHandle lh : lhs) {
+ Enumeration<LedgerEntry> entries = lh.readEntries(startEntryId, endEntryId);
+ while (entries.hasMoreElements()) {
+ LedgerEntry entry = entries.nextElement();
+ assertTrue(new String(entry.getEntry()).equals("LedgerId: " + entry.getLedgerId() + ", EntryId: "
+ + entry.getEntryId()));
+ }
+ }
+
+ }
+
+ /**
+ * This tests the asynchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a new one to
+ * replace it, and then recovering the ledger entries from the killed bookie
+ * onto the new one. We'll verify that the entries stored on the killed
+ * bookie are properly copied over and restored onto the new one.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAsyncBookieRecoveryToSpecificBookie() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup a new bookie server
+ int newBookiePort = initialPort + numBookies;
+ startNewBookie(newBookiePort);
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the async recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+ + bookieDest + ")");
+ // Initiate the sync object
+ sync.value = false;
+ bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+ // Wait for the async method to complete.
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the asynchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a few new
+ * bookies, and then recovering the ledger entries from the killed bookie
+ * onto random available bookie servers. We'll verify that the entries
+ * stored on the killed bookie are properly copied over and restored onto
+ * the other bookies.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAsyncBookieRecoveryToRandomBookies() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup three new bookie servers
+ for (int i = 0; i < 3; i++) {
+ int newBookiePort = initialPort + numBookies + i;
+ startNewBookie(newBookiePort);
+ }
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the async recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ // Initiate the sync object
+ sync.value = false;
+ bkTools.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+
+ // Wait for the async method to complete.
+ synchronized (sync) {
+ while (sync.value == false) {
+ sync.wait();
+ }
+ }
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the synchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a new one to
+ * replace it, and then recovering the ledger entries from the killed bookie
+ * onto the new one. We'll verify that the entries stored on the killed
+ * bookie are properly copied over and restored onto the new one.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSyncBookieRecoveryToSpecificBookie() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup a new bookie server
+ int newBookiePort = initialPort + numBookies;
+ startNewBookie(newBookiePort);
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the sync recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBookiePort);
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+ + bookieDest + ")");
+ bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+ /**
+ * This tests the synchronous bookie recovery functionality by writing
+ * entries into 3 bookies, killing one bookie, starting up a few new
+ * bookies, and then recovering the ledger entries from the killed bookie
+ * onto random available bookie servers. We'll verify that the entries
+ * stored on the killed bookie are properly copied over and restored onto
+ * the other bookies.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSyncBookieRecoveryToRandomBookies() throws Exception {
+ // Create the ledgers
+ int numLedgers = 3;
+ List<LedgerHandle> lhs = createLedgers(numLedgers);
+
+ // Write the entries for the ledgers with dummy values.
+ int numMsgs = 10;
+ writeEntriestoLedgers(numMsgs, 0, lhs);
+
+ // Shutdown the first bookie server
+ LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
+ bs.get(0).shutdown();
+ bs.remove(0);
+
+ // Startup three new bookie servers
+ for (int i = 0; i < 3; i++) {
+ int newBookiePort = initialPort + numBookies + i;
+ startNewBookie(newBookiePort);
+ }
+
+ // Write some more entries for the ledgers so a new ensemble will be
+ // created for them.
+ writeEntriestoLedgers(numMsgs, 10, lhs);
+
+ // Call the sync recover bookie method.
+ InetSocketAddress bookieSrc = new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), initialPort);
+ InetSocketAddress bookieDest = null;
+ LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ + ") and replicate it to a random available one");
+ bkTools.recoverBookieData(bookieSrc, bookieDest);
+
+ // Verify the recovered ledger entries are okay.
+ verifyRecoveredLedgers(numLedgers, 0, 2 * numMsgs - 1);
+ }
+
+}