You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/04/26 18:38:03 UTC

svn commit: r1476283 - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/test/java/org/apache/bookkeeper/client/

Author: ivank
Date: Fri Apr 26 16:38:02 2013
New Revision: 1476283

URL: http://svn.apache.org/r1476283
Log:
BOOKKEEPER-584: Data loss when ledger metadata is overwritten (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1476283&r1=1476282&r2=1476283&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Fri Apr 26 16:38:02 2013
@@ -24,6 +24,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-581: Ledger recovery doesn't work correctly when recovery adds force changing ensembles. (sijie via ivank)
 
+      BOOKKEEPER-584: Data loss when ledger metadata is overwritten (sijie via ivank)
+
       bookkeeper-server:
 
         BOOKKEEPER-567: ReadOnlyBookieTest hangs on shutdown (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java?rev=1476283&r1=1476282&r2=1476283&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java Fri Apr 26 16:38:02 2013
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.client;
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -20,38 +18,34 @@ package org.apache.bookkeeper.client;
  * under the License.
  *
  */
+package org.apache.bookkeeper.client;
+
+import static com.google.common.base.Charsets.UTF_8;
 
 import java.net.InetSocketAddress;
 import java.security.GeneralSecurityException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
-import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
-import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
-import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
-
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
+import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback;
 import org.apache.bookkeeper.util.SafeRunnable;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.common.util.concurrent.RateLimiter;
-
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.jboss.netty.buffer.ChannelBuffer;
+import com.google.common.util.concurrent.RateLimiter;
 
 /**
  * Ledger handle contains ledger metadata and is used to access the read and
@@ -298,7 +292,12 @@ public class LedgerHandle {
                                         }
 
                                         metadata.setLength(prevLength);
-                                        if (metadata.resolveConflict(newMeta)) {
+                                        if (!metadata.isNewerThan(newMeta)
+                                                && !metadata.isConflictWith(newMeta)) {
+                                            // use the new metadata's ensemble, in case re-replication already
+                                            // replaced some bookies in the ensemble.
+                                            metadata.setEnsembles(newMeta.getEnsembles());
+                                            metadata.setVersion(newMeta.version);
                                             metadata.setLength(length);
                                             metadata.close(lastAddConfirmed);
                                             writeLedgerConfig(new CloseCb());
@@ -486,7 +485,7 @@ public class LedgerHandle {
         final long currentLength;
         synchronized(this) {
             // synchronized on this to ensure that
-            // the ledger isn't closed between checking and 
+            // the ledger isn't closed between checking and
             // updating lastAddPushed
             if (metadata.isClosed()) {
                 LOG.warn("Attempt to add to closed ledger: " + ledgerId);
@@ -533,6 +532,7 @@ public class LedgerHandle {
 
     public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) {
         ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
+                @Override
                 public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) {
                     if (rc == BKException.Code.OK) {
                         lastAddConfirmed = Math.max(lastAddConfirmed, data.lastAddConfirmed);
@@ -782,6 +782,8 @@ public class LedgerHandle {
         }
 
         /**
+         * Specific resolve conflicts happened when multiple bookies failures in same ensemble.
+         * <p>
          * Resolving the version conflicts between local ledgerMetadata and zk
          * ledgerMetadata. This will do the following:
          * <ul>
@@ -791,23 +793,35 @@ public class LedgerHandle {
          * if the zk ledgerMetadata still contains the failed bookie, then
          * update zookeeper with the newBookie otherwise send write request</li>
          * </ul>
+         * </p>
          */
         private boolean resolveConflict(LedgerMetadata newMeta) {
-            // close have changed, another client has opened
-            // the ledger, can't resolve this conflict.
+            // make sure the ledger isn't closed by other ones.
             if (metadata.getState() != newMeta.getState()) {
                 return false;
             }
-            // update znode version
-            metadata.setVersion(newMeta.getVersion());
-            // Resolve the conflicts if zk metadata still contains failed
-            // bookie.
+
+            // If the failed the bookie is still existed in the metadata (in zookeeper), it means that
+            // the ensemble change of the failed bookie is failed due to metadata conflicts. so try to
+            // update the ensemble change metadata again. Otherwise, it means that the ensemble change
+            // is already succeed, unset the success and re-adding entries.
             if (newMeta.currentEnsemble.get(ensembleInfo.bookieIndex).equals(
                     ensembleInfo.addr)) {
-                // Update ledger metadata in zk, if in-memory metadata doesn't
-                // contains the failed bookie.
+                // If the in-memory data doesn't contains the failed bookie, it means the ensemble change
+                // didn't finish, so try to resolve conflicts with the metadata read from zookeeper and
+                // update ensemble changed metadata again.
                 if (!metadata.currentEnsemble.get(ensembleInfo.bookieIndex)
                         .equals(ensembleInfo.addr)) {
+                    // if the local metadata is newer than zookeeper metadata, it means that metadata is updated
+                    // again when it was trying re-reading the metatada, re-kick the reread again
+                    if (metadata.isNewerThan(newMeta)) {
+                        rereadMetadata(this);
+                        return true;
+                    }
+                    // make sure the metadata doesn't changed by other ones.
+                    if (metadata.isConflictWith(newMeta)) {
+                        return false;
+                    }
                     LOG.info("Resolve ledger metadata conflict "
                             + "while changing ensemble to: "
                             + ensembleInfo.newEnsemble
@@ -815,6 +829,8 @@ public class LedgerHandle {
                             + new String(metadata.serialize(), UTF_8)
                             + "\n, new meta data is \n"
                             + new String(newMeta.serialize(), UTF_8));
+                    // update znode version
+                    metadata.setVersion(newMeta.getVersion());
                     writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo));
                 }
             } else {
@@ -907,6 +923,7 @@ public class LedgerHandle {
          * @param ctx
          *          control object
          */
+        @Override
         public void readComplete(int rc, LedgerHandle lh,
                                  Enumeration<LedgerEntry> seq, Object ctx) {
             
@@ -935,6 +952,7 @@ public class LedgerHandle {
          * @param ctx
          *          control object
          */
+        @Override
         public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
 
@@ -948,6 +966,7 @@ public class LedgerHandle {
         /**
          * Implementation of  callback interface for synchronous read last confirmed method.
          */
+        @Override
         public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
             LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx;
             
@@ -967,6 +986,7 @@ public class LedgerHandle {
          * @param lh
          * @param ctx
          */
+        @Override
         public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
             SyncCounter counter = (SyncCounter) ctx;
             counter.setrc(rc);

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java?rev=1476283&r1=1476282&r2=1476283&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java Fri Apr 26 16:38:02 2013
@@ -140,6 +140,10 @@ public class LedgerMetadata {
         return ensembles;
     }
 
+    void setEnsembles(SortedMap<Long, ArrayList<InetSocketAddress>> ensembles) {
+        this.ensembles = ensembles;
+    }
+
     public int getEnsembleSize() {
         return ensembleSize;
     }
@@ -434,13 +438,26 @@ public class LedgerMetadata {
     }
 
     /**
-     * Resolve conflict with new updated metadata.
+     * Is the metadata newer that given <i>newMeta</i>.
+     *
+     * @param newMeta
+     * @return
+     */
+    boolean isNewerThan(LedgerMetadata newMeta) {
+        if (null == version) {
+            return false;
+        }
+        return Version.Occurred.AFTER == version.compare(newMeta.version);
+    }
+
+    /**
+     * Is the metadata conflict with new updated metadata.
      *
      * @param newMeta
      *          Re-read metadata
-     * @return true if the conflict has been resolved, otherwise false.
+     * @return true if the metadata is conflict.
      */
-    boolean resolveConflict(LedgerMetadata newMeta) {
+    boolean isConflictWith(LedgerMetadata newMeta) {
         /*
          *  if length & close have changed, then another client has
          *  opened the ledger, can't resolve this conflict.
@@ -454,22 +471,17 @@ public class LedgerMetadata {
             state != newMeta.state ||
             !digestType.equals(newMeta.digestType) ||
             !Arrays.equals(password, newMeta.password)) {
-            return false;
+            return true;
         }
         if (state == LedgerMetadataFormat.State.CLOSED
             && lastEntryId != newMeta.lastEntryId) {
-            return false;
-        }
-        // new meta znode version should be larger than old one
-        if (null != version &&
-            Version.Occurred.AFTER == version.compare(newMeta.version)) {
-            return false;
+            return true;
         }
         // if ledger is closed, we can just take the new ensembles
         if (newMeta.state != LedgerMetadataFormat.State.CLOSED) {
             // ensemble size should be same
             if (ensembles.size() != newMeta.ensembles.size()) {
-                return false;
+                return true;
             }
             // ensemble distribution should be same
             // we don't check the detail ensemble, since new bookie will be set
@@ -480,16 +492,10 @@ public class LedgerMetadata {
                 Long curKey = keyIter.next();
                 Long newMetaKey = newMetaKeyIter.next();
                 if (!curKey.equals(newMetaKey)) {
-                    return false;
+                    return true;
                 }
             }
         }
-        /*
-         *  if the conflict has been resolved, then update
-         *  ensemble and znode version
-         */
-        ensembles = newMeta.ensembles;
-        version = newMeta.version;
-        return true;
+        return false;
     }
 }

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java?rev=1476283&r1=1476282&r2=1476283&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java Fri Apr 26 16:38:02 2013
@@ -21,11 +21,11 @@ import java.util.HashSet;
 import java.util.Set;
 import java.net.InetSocketAddress;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
 
 /**
  * This represents a pending add operation. When it has got success from all
@@ -132,6 +132,11 @@ class PendingAddOp implements WriteCallb
     public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) {
         int bookieIndex = (Integer) ctx;
 
+        if (completed) {
+            // I am already finished, ignore incoming responses.
+            // otherwise, we might hit the following error handling logic, which might cause bad things.
+            return;
+        }
 
         switch (rc) {
         case BKException.Code.OK:

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java?rev=1476283&r1=1476282&r2=1476283&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java Fri Apr 26 16:38:02 2013
@@ -58,6 +58,7 @@ public class BookieWriteLedgerTest exten
 
     private static class SyncObj {
         volatile int counter;
+        volatile int rc;
 
         public SyncObj() {
             counter = 0;
@@ -182,6 +183,7 @@ public class BookieWriteLedgerTest exten
                 LOG.debug("Entries counter = " + syncObj1.counter);
                 syncObj1.wait();
             }
+            assertEquals(BKException.Code.OK, syncObj1.rc);
         }
         // wait for all entries to be acknowledged for the second ledger
         synchronized (syncObj2) {
@@ -189,6 +191,7 @@ public class BookieWriteLedgerTest exten
                 LOG.debug("Entries counter = " + syncObj2.counter);
                 syncObj2.wait();
             }
+            assertEquals(BKException.Code.OK, syncObj2.rc);
         }
 
         // reading ledger till the last entry
@@ -217,12 +220,9 @@ public class BookieWriteLedgerTest exten
 
     @Override
     public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
-        if (rc != BKException.Code.OK)
-            fail("Return code is not OK: " + rc);
-
         SyncObj x = (SyncObj) ctx;
-
         synchronized (x) {
+            x.rc = rc;
             x.counter++;
             x.notify();
         }

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java?rev=1476283&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java Fri Apr 26 16:38:02 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class tests the ledger close logic.
+ */
+public class LedgerCloseTest extends BookKeeperClusterTestCase {
+
+    static Logger LOG = LoggerFactory.getLogger(LedgerCloseTest.class);
+
+    static final int READ_TIMEOUT = 1;
+
+    final DigestType digestType;
+
+    public LedgerCloseTest() {
+        super(6);
+        this.digestType = DigestType.CRC32;
+        // set timeout to a large value which disable it.
+        baseClientConf.setReadTimeout(99999);
+        baseConf.setGcWaitTime(999999);
+    }
+
+    @Test(timeout = 60000)
+    public void testLedgerCloseDuringUnrecoverableErrors() throws Exception {
+        int numEntries = 3;
+        final CountDownLatch addDoneLatch = new CountDownLatch(1);
+        final CountDownLatch deadIOLatch = new CountDownLatch(1);
+        final CountDownLatch recoverDoneLatch = new CountDownLatch(1);
+        final CountDownLatch failedLatch = new CountDownLatch(1);
+
+        LedgerHandle lh = bkc.createLedger(3, 3, 3, digestType, "".getBytes());
+        // kill first bookie to replace with a unauthorize bookie
+        InetSocketAddress bookie = lh.getLedgerMetadata().currentEnsemble.get(0);
+        ServerConfiguration conf = killBookie(bookie);
+        // replace a unauthorize bookie
+        startUnauthorizedBookie(conf, addDoneLatch);
+        // kill second bookie to replace with a dead bookie
+        bookie = lh.getLedgerMetadata().currentEnsemble.get(1);
+        conf = killBookie(bookie);
+        // replace a slow dead bookie
+        startDeadBookie(conf, deadIOLatch);
+
+        // tried to add entries
+        for (int i = 0; i < numEntries; i++) {
+            lh.asyncAddEntry("data".getBytes(), new AddCallback() {
+                @Override
+                public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+                    if (BKException.Code.OK != rc) {
+                        failedLatch.countDown();
+                        deadIOLatch.countDown();
+                    }
+                    if (0 == entryId) {
+                        try {
+                            recoverDoneLatch.await();
+                        } catch (InterruptedException ie) {
+                        }
+                    }
+                }
+            }, null);
+        }
+        // add finished
+        addDoneLatch.countDown();
+        // wait until entries failed due to UnauthorizedAccessException
+        failedLatch.await();
+        // simulate the ownership of this ledger is transfer to another host (which is actually
+        // what we did in Hedwig).
+        LOG.info("Recover ledger {}.", lh.getId());
+        ClientConfiguration newConf = new ClientConfiguration();
+        newConf.addConfiguration(baseClientConf);
+        BookKeeper newBkc = new BookKeeperTestClient(newConf.setReadTimeout(1));
+        LedgerHandle recoveredLh = newBkc.openLedger(lh.getId(), digestType, "".getBytes());
+        LOG.info("Recover ledger {} done.", lh.getId());
+        recoverDoneLatch.countDown();
+        // wait a bit until add operations failed from second bookie due to IOException
+        TimeUnit.SECONDS.sleep(5);
+        // open the ledger again to make sure we ge the right last confirmed.
+        LedgerHandle newLh = newBkc.openLedger(lh.getId(), digestType, "".getBytes());
+        assertEquals("Metadata should be consistent across different opened ledgers",
+                recoveredLh.getLastAddConfirmed(), newLh.getLastAddConfirmed());
+    }
+
+    private void startUnauthorizedBookie(ServerConfiguration conf, final CountDownLatch latch)
+            throws Exception {
+        Bookie sBookie = new Bookie(conf) {
+            @Override
+            public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+                    throws IOException, BookieException {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                }
+                throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
+            }
+
+            @Override
+            public void recoveryAddEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+                    throws IOException, BookieException {
+                throw new IOException("Dead bookie for recovery adds.");
+            }
+        };
+        bsConfs.add(conf);
+        bs.add(startBookie(conf, sBookie));
+    }
+
+    // simulate slow adds, then become normal when recover,
+    // so no ensemble change when recovering ledger on this bookie.
+    private void startDeadBookie(ServerConfiguration conf, final CountDownLatch latch) throws Exception {
+        Bookie dBookie = new Bookie(conf) {
+            @Override
+            public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
+                    throws IOException, BookieException {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                }
+                // simulate slow adds.
+                throw new IOException("Dead bookie");
+            }
+        };
+        bsConfs.add(conf);
+        bs.add(startBookie(conf, dBookie));
+    }
+}