You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/27 23:09:57 UTC

[bookkeeper] branch master updated: ISSUE #596 ISSUE #583: Auto replication should honor ensemble placement policy

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcabfc  ISSUE #596 ISSUE #583: Auto replication should honor ensemble placement policy
6fcabfc is described below

commit 6fcabfc80c5bd6cdfa9252cdfc4ed209cc0620a6
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Oct 27 16:09:48 2017 -0700

    ISSUE #596 ISSUE #583: Auto replication should honor ensemble placement policy
    
    Descriptions of the changes in this PR:
    
    This pull request ports the changes from [twitter/bookkeeperfc7e171](https://github.com/twitter/bookkeeper/commit/fc7e171135a58cddb9e91f5f614b3ceb6f9f9fee)
    
    The changes include:
    
    1. when bookkeeper admin re-replicates a ledger, it will pick a bookie from the available bookies in the cluster to satisfy the placement constraint. (#596)
    2. hence, remove `targetBookie` from ReplicationWorker, because the parameter will volatile the placement constraint. (#583)
    3. at the same time, change `LedgerFragement` to represent the number of bookies that need to be check and replicate. a) the ledger checker can use the correct bookie index for verifying the existence of entries in a bookie (for stripping case) b) only read entries one time when need to replicate them to multiple bookies.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>
    
    This closes #641 from sijie/twitter_autorecovery_fixes, closes #596, closes #583
---
 bookkeeper-server/conf/log4j.shell.properties      |   7 +
 .../apache/bookkeeper/client/BookKeeperAdmin.java  | 486 +++++++++++++--------
 .../apache/bookkeeper/client/LedgerChecker.java    | 167 +++++--
 .../apache/bookkeeper/client/LedgerFragment.java   | 106 ++++-
 .../client/LedgerFragmentReplicator.java           | 173 ++++----
 .../bookkeeper/conf/ServerConfiguration.java       |  21 +
 .../org/apache/bookkeeper/replication/Auditor.java |  12 +-
 .../bookkeeper/replication/AutoRecoveryMain.java   |   9 +-
 .../bookkeeper/replication/ReplicationStats.java   |   1 +
 .../bookkeeper/replication/ReplicationWorker.java  | 168 +++----
 .../bookkeeper/client/BookKeeperCloseTest.java     |  21 +-
 .../bookkeeper/client/BookieRecoveryTest.java      |   9 +-
 .../bookkeeper/client/TestLedgerChecker.java       |  55 ++-
 .../client/TestLedgerFragmentReplication.java      |  15 +-
 .../replication/BookieAutoRecoveryTest.java        |   4 +-
 .../replication/TestReplicationWorker.java         |  64 +--
 16 files changed, 778 insertions(+), 540 deletions(-)

diff --git a/bookkeeper-server/conf/log4j.shell.properties b/bookkeeper-server/conf/log4j.shell.properties
index 58d6ea6..c2f2e0e 100644
--- a/bookkeeper-server/conf/log4j.shell.properties
+++ b/bookkeeper-server/conf/log4j.shell.properties
@@ -36,6 +36,13 @@ log4j.appender.CONSOLE.Threshold=INFO
 log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
 
+# verbose console logging
+log4j.appender.VERBOSECONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.VERBOSECONSOLE.Threshold=INFO
+log4j.appender.VERBOSECONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.VERBOSECONSOLE.layout.ConversionPattern=%m%n
+
+log4j.logger.verbose=INFO,VERBOSECONSOLE
 log4j.logger.org.apache.zookeeper=ERROR
 log4j.logger.org.apache.bookkeeper=ERROR
 log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index a0d2fe2..d2e5db3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -23,18 +23,22 @@ package org.apache.bookkeeper.client;
 import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractFuture;
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
@@ -70,7 +74,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -84,7 +87,10 @@ import org.slf4j.LoggerFactory;
  * Admin client for BookKeeper clusters
  */
 public class BookKeeperAdmin implements AutoCloseable {
+
     private final static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
+    private final static Logger VERBOSE = LoggerFactory.getLogger("verbose");
+
     // ZK client instance
     private ZooKeeper zk;
     private final boolean ownsZK;
@@ -495,9 +501,20 @@ public class BookKeeperAdmin implements AutoCloseable {
      */
     public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest)
             throws InterruptedException, BKException {
+        Set<BookieSocketAddress> bookiesSrc = Sets.newHashSet(bookieSrc);
+        recoverBookieData(bookiesSrc);
+    }
+
+    public void recoverBookieData(final Set<BookieSocketAddress> bookiesSrc)
+            throws InterruptedException, BKException {
+        recoverBookieData(bookiesSrc, false, false);
+    }
+
+    public void recoverBookieData(final Set<BookieSocketAddress> bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
+            throws InterruptedException, BKException {
         SyncObject sync = new SyncObject();
         // Call the async method to recover bookie data.
-        asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
+        asyncRecoverBookieData(bookiesSrc, dryrun, skipOpenLedgers, new RecoverCallback() {
             @Override
             public void recoverComplete(int rc, Object ctx) {
                 LOG.info("Recover bookie operation completed with rc: " + rc);
@@ -520,7 +537,7 @@ public class BookKeeperAdmin implements AutoCloseable {
             throw BKException.create(sync.rc);
         }
     }
-
+	
     /**
      * Async method to rebuild and recover the ledger fragments data that was
      * stored on the source bookie. That bookie could have failed completely and
@@ -535,89 +552,26 @@ public class BookKeeperAdmin implements AutoCloseable {
      * @param bookieSrc
      *            Source bookie that had a failure. We want to replicate the
      *            ledger fragments that were stored there.
-     * @param bookieDest
-     *            Optional destination bookie that if passed, we will copy all
-     *            of the ledger fragments from the source bookie over to it.
      * @param cb
      *            RecoverCallback to invoke once all of the data on the dead
      *            bookie has been recovered and replicated.
      * @param context
      *            Context for the RecoverCallback to call.
      */
-    public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
+    public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc,
                                        final RecoverCallback cb, final Object context) {
-        // Sync ZK to make sure we're reading the latest bookie data.
-        zk.sync(bookiesPath, new AsyncCallback.VoidCallback() {
-            @Override
-            public void processResult(int rc, String path, Object ctx) {
-                if (rc != Code.OK.intValue()) {
-                    LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path));
-                    cb.recoverComplete(BKException.Code.ZKException, context);
-                    return;
-                }
-                getAvailableBookies(bookieSrc, bookieDest, cb, context);
-            }
+        Set<BookieSocketAddress> bookiesSrc = Sets.newHashSet(bookieSrc);
+        asyncRecoverBookieData(bookiesSrc, cb, context);
+    }
 
-        }, null);
+    public void asyncRecoverBookieData(final Set<BookieSocketAddress> bookieSrc,
+                                       final RecoverCallback cb, final Object context) {
+        asyncRecoverBookieData(bookieSrc, false, false, cb, context);
     }
 
-    /**
-     * This method asynchronously gets the set of available Bookies that the
-     * dead input bookie's data will be copied over into. If the user passed in
-     * a specific destination bookie, then just use that one. Otherwise, we'll
-     * randomly pick one of the other available bookies to use for each ledger
-     * fragment we are replicating.
-     *
-     * @param bookieSrc
-     *            Source bookie that had a failure. We want to replicate the
-     *            ledger fragments that were stored there.
-     * @param bookieDest
-     *            Optional destination bookie that if passed, we will copy all
-     *            of the ledger fragments from the source bookie over to it.
-     * @param cb
-     *            RecoverCallback to invoke once all of the data on the dead
-     *            bookie has been recovered and replicated.
-     * @param context
-     *            Context for the RecoverCallback to call.
-     */
-    private void getAvailableBookies(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
-                                     final RecoverCallback cb, final Object context) {
-        final List<BookieSocketAddress> availableBookies = new LinkedList<BookieSocketAddress>();
-        if (bookieDest != null) {
-            availableBookies.add(bookieDest);
-            // Now poll ZK to get the active ledgers
-            getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
-        } else {
-            zk.getChildren(bookiesPath, null, new AsyncCallback.ChildrenCallback() {
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<String> children) {
-                    if (rc != Code.OK.intValue()) {
-                        LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
-                                  .get(rc), path));
-                        cb.recoverComplete(BKException.Code.ZKException, context);
-                        return;
-                    }
-                    for (String bookieNode : children) {
-                        if (BookKeeperConstants.READONLY
-                                        .equals(bookieNode)) {
-                            // exclude the readonly node from available bookies.
-                            continue;
-                        }
-                        BookieSocketAddress addr;
-                        try {
-                            addr = new BookieSocketAddress(bookieNode);
-                        } catch (UnknownHostException nhe) {
-                            LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
-                            cb.recoverComplete(BKException.Code.ZKException, context);
-                            return;
-                        }
-                        availableBookies.add(addr);
-                    }
-                    // Now poll ZK to get the active ledgers
-                    getActiveLedgers(bookieSrc, null, cb, context, availableBookies);
-                }
-            }, null);
-        }
+    public void asyncRecoverBookieData(final Set<BookieSocketAddress> bookieSrc, boolean dryrun,
+                                       final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
+        getActiveLedgers(bookieSrc, dryrun, skipOpenLedgers, cb, context);
     }
 
     /**
@@ -626,25 +580,21 @@ public class BookKeeperAdmin implements AutoCloseable {
      * determine if any of the ledger fragments for it were stored at the dead
      * input bookie.
      *
-     * @param bookieSrc
-     *            Source bookie that had a failure. We want to replicate the
+     * @param bookiesSrc
+     *            Source bookies that had a failure. We want to replicate the
      *            ledger fragments that were stored there.
-     * @param bookieDest
-     *            Optional destination bookie that if passed, we will copy all
-     *            of the ledger fragments from the source bookie over to it.
+     * @param dryrun
+     *            dryrun the recover procedure.
+     * @param skipOpenLedgers
+     *            Skip recovering open ledgers.
      * @param cb
      *            RecoverCallback to invoke once all of the data on the dead
      *            bookie has been recovered and replicated.
      * @param context
      *            Context for the RecoverCallback to call.
-     * @param availableBookies
-     *            List of Bookie Servers that are available to use for
-     *            replicating data on the failed bookie. This could contain a
-     *            single bookie server if the user explicitly chose a bookie
-     *            server to replicate data to.
      */
-    private void getActiveLedgers(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
-            final RecoverCallback cb, final Object context, final List<BookieSocketAddress> availableBookies) {
+    private void getActiveLedgers(final Set<BookieSocketAddress> bookiesSrc, final boolean dryrun,
+                                  final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
         // Wrapper class around the RecoverCallback so it can be used
         // as the final VoidCallback to process ledgers
         class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
@@ -663,7 +613,7 @@ public class BookKeeperAdmin implements AutoCloseable {
         Processor<Long> ledgerProcessor = new Processor<Long>() {
             @Override
             public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
-                recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
+                recoverLedger(bookiesSrc, ledgerId, dryrun, skipOpenLedgers, iterCallback);
             }
         };
         bkc.getLedgerManager().asyncProcessLedgers(
@@ -672,41 +622,24 @@ public class BookKeeperAdmin implements AutoCloseable {
     }
 
     /**
-     * Get a new random bookie, but ensure that it isn't one that is already
-     * in the ensemble for the ledger.
-     */
-    private BookieSocketAddress getNewBookie(final List<BookieSocketAddress> bookiesAlreadyInEnsemble,
-            final List<BookieSocketAddress> availableBookies)
-            throws BKException.BKNotEnoughBookiesException {
-        ArrayList<BookieSocketAddress> candidates = new ArrayList<BookieSocketAddress>();
-        candidates.addAll(availableBookies);
-        candidates.removeAll(bookiesAlreadyInEnsemble);
-        if (candidates.size() == 0) {
-            throw new BKException.BKNotEnoughBookiesException();
-        }
-        return candidates.get(rand.nextInt(candidates.size()));
-    }
-
-    /**
      * This method asynchronously recovers a given ledger if any of the ledger
      * entries were stored on the failed bookie.
      *
-     * @param bookieSrc
-     *            Source bookie that had a failure. We want to replicate the
+     * @param bookiesSrc
+     *            Source bookies that had a failure. We want to replicate the
      *            ledger fragments that were stored there.
      * @param lId
      *            Ledger id we want to recover.
-     * @param ledgerIterCb
+     * @param dryrun
+     *            printing the recovery plan without actually recovering bookies
+     * @param skipOpenLedgers
+     *            Skip recovering open ledgers.
+     * @param finalLedgerIterCb
      *            IterationCallback to invoke once we've recovered the current
      *            ledger.
-     * @param availableBookies
-     *            List of Bookie Servers that are available to use for
-     *            replicating data on the failed bookie. This could contain a
-     *            single bookie server if the user explicitly chose a bookie
-     *            server to replicate data to.
      */
-    private void recoverLedger(final BookieSocketAddress bookieSrc, final long lId,
-            final AsyncCallback.VoidCallback ledgerIterCb, final List<BookieSocketAddress> availableBookies) {
+    private void recoverLedger(final Set<BookieSocketAddress> bookiesSrc, final long lId, final boolean dryrun,
+                               final boolean skipOpenLedgers, final AsyncCallback.VoidCallback finalLedgerIterCb) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Recovering ledger : {}", lId);
         }
@@ -714,43 +647,73 @@ public class BookKeeperAdmin implements AutoCloseable {
         asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
             @Override
             public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
-                if (rc != Code.OK.intValue()) {
+                if (rc != BKException.Code.OK) {
                     LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
-                    ledgerIterCb.processResult(rc, null, null);
+                    finalLedgerIterCb.processResult(rc, null, null);
                     return;
                 }
 
                 LedgerMetadata lm = lh.getLedgerMetadata();
-                if (!lm.isClosed() &&
-                    lm.getEnsembles().size() > 0) {
-                    Long lastKey = lm.getEnsembles().lastKey();
-                    ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
-                    // the original write has not removed faulty bookie from
-                    // current ledger ensemble. to avoid data loss issue in
-                    // the case of concurrent updates to the ensemble composition,
-                    // the recovery tool should first close the ledger
-                    if (lastEnsemble.contains(bookieSrc)) {
-                        // close opened non recovery ledger handle
+                if (skipOpenLedgers && !lm.isClosed() && !lm.isInRecovery()) {
+                    LOG.info("Skip recovering open ledger {}.", lId);
+                    try {
+                        lh.close();
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    } catch (BKException bke) {
+                        LOG.warn("Error on closing ledger handle for {}.", lId);
+                    }
+                    finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
+                    return;
+                }
+
+                final boolean fenceRequired = !lm.isClosed() && containBookiesInLastEnsemble(lm, bookiesSrc);
+                // the original write has not removed faulty bookie from
+                // current ledger ensemble. to avoid data loss issue in
+                // the case of concurrent updates to the ensemble composition,
+                // the recovery tool should first close the ledger
+                if (!dryrun && fenceRequired) {
+                    // close opened non recovery ledger handle
+                    try {
+                        lh.close();
+                    } catch (Exception ie) {
+                        LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+                    }
+                    asyncOpenLedger(lId, new OpenCallback() {
+                        @Override
+                        public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+                            if (newrc != BKException.Code.OK) {
+                                LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+                                finalLedgerIterCb.processResult(newrc, null, null);
+                                return;
+                            }
+                            bkc.mainWorkerPool.submit(() -> {
+                                // do recovery
+                                recoverLedger(bookiesSrc, lId, dryrun, skipOpenLedgers, finalLedgerIterCb);
+                            });
+                        }
+                    }, null);
+                    return;
+                }
+
+                final AsyncCallback.VoidCallback ledgerIterCb = new AsyncCallback.VoidCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx) {
+                        if (BKException.Code.OK != rc) {
+                            LOG.error("Failed to recover ledger {} : {}", lId, rc);
+                        } else {
+                            LOG.info("Recovered ledger {}.", lId);
+                        }
                         try {
                             lh.close();
-                        } catch (Exception ie) {
-                            LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        } catch (BKException bke) {
+                            LOG.warn("Error on cloing ledger handle for {}.", lId);
                         }
-                        asyncOpenLedger(lId, new OpenCallback() {
-                            @Override
-                            public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
-                                if (newrc != Code.OK.intValue()) {
-                                    LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
-                                    ledgerIterCb.processResult(newrc, null, null);
-                                    return;
-                                }
-                                // do recovery
-                                recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
-                            }
-                        }, null);
-                        return;
+                        finalLedgerIterCb.processResult(rc, path, ctx);
                     }
-                }
+                };
 
                 /*
                  * This List stores the ledger fragments to recover indexed by
@@ -773,7 +736,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                     if (curEntryId != null)
                         ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
                     curEntryId = entry.getKey();
-                    if (entry.getValue().contains(bookieSrc)) {
+                    if (containBookies(entry.getValue(), bookiesSrc)) {
                         /*
                          * Current ledger fragment has entries stored on the
                          * dead bookie so we'll need to recover them.
@@ -797,6 +760,10 @@ public class BookKeeperAdmin implements AutoCloseable {
                     return;
                 }
 
+                if (dryrun) {
+                    VERBOSE.info("Recovered ledger {} : {}", lId, (fenceRequired ? "[fence required]" : ""));
+                }
+
                 /*
                  * Multicallback for ledger. Once all fragments for the ledger have been recovered
                  * trigger the ledgerIterCb
@@ -810,47 +777,69 @@ public class BookKeeperAdmin implements AutoCloseable {
                  */
                 for (final Long startEntryId : ledgerFragmentsToRecover) {
                     Long endEntryId = ledgerFragmentsRange.get(startEntryId);
-                    BookieSocketAddress newBookie = null;
+                    ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+                    // Get bookies to replace
+                    Map<Integer, BookieSocketAddress> targetBookieAddresses;
                     try {
-                        newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
-                                                 availableBookies);
-                    } catch (BKException.BKNotEnoughBookiesException bke) {
-                        ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException,
-                                                         null, null);
+                        targetBookieAddresses = getReplacementBookies(lh, ensemble, bookiesSrc);
+                    } catch (BKException.BKNotEnoughBookiesException e) {
+                        if (!dryrun) {
+                            ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
+                        } else {
+                            VERBOSE.info("  Fragment [{} - {}] : {}", startEntryId, endEntryId,
+                                BKException.getMessage(BKException.Code.NotEnoughBookiesException));
+                        }
                         continue;
                     }
 
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Replicating fragment from [" + startEntryId
-                                  + "," + endEntryId + "] of ledger " + lh.getId()
-                                  + " to " + newBookie);
-                    }
-                    try {
-                        LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
-                                                                               ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
-                        ArrayList<BookieSocketAddress> currentEnsemble = lh.getLedgerMetadata().getEnsemble(
-                                startEntryId);
-                        int bookieIndex = -1;
-                        if (null != currentEnsemble) {
-                            for (int i = 0; i < currentEnsemble.size(); i++) {
-                                if (currentEnsemble.get(i).equals(bookieSrc)) {
-                                    bookieIndex = i;
-                                    break;
-                                }
-                            }
+                    if (dryrun) {
+                        ArrayList<BookieSocketAddress> newEnsemble =
+                                replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
+                        VERBOSE.info("  Fragment [{} - {}] : ", startEntryId, endEntryId);
+                        VERBOSE.info("    old ensemble : {}", formatEnsemble(ensemble, bookiesSrc, '*'));
+                        VERBOSE.info("    new ensemble : {}", formatEnsemble(newEnsemble, bookiesSrc, '*'));
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Replicating fragment from [{}, {}] of ledger {} to {}",
+                                startEntryId, endEntryId, lh.getId(), targetBookieAddresses);
+                        }
+                        try {
+                            LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
+                                ledgerFragmentsMcb, lh, startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
+                            LedgerFragment ledgerFragment = new LedgerFragment(lh,
+                                startEntryId, endEntryId, targetBookieAddresses.keySet());
+                            asyncRecoverLedgerFragment(lh, ledgerFragment, cb, Sets.newHashSet(targetBookieAddresses.values()));
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            return;
                         }
-                        LedgerFragment ledgerFragment = new LedgerFragment(lh,
-                                startEntryId, endEntryId, bookieIndex);
-                        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, newBookie);
-                    } catch(InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        return;
                     }
                 }
+                if (dryrun) {
+                    ledgerIterCb.processResult(BKException.Code.OK, null, null);
+                }
             }
             }, null);
     }
 
+    static String formatEnsemble(ArrayList<BookieSocketAddress> ensemble, Set<BookieSocketAddress> bookiesSrc, char marker) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (int i = 0; i < ensemble.size(); i++) {
+            sb.append(ensemble.get(i));
+            if (bookiesSrc.contains(ensemble.get(i))) {
+                sb.append(marker);
+            } else {
+                sb.append(' ');
+            }
+            if (i != ensemble.size() - 1) {
+                sb.append(", ");
+            }
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
     /**
      * This method asynchronously recovers a ledger fragment which is a
      * contiguous portion of a ledger that was stored in an ensemble that
@@ -863,16 +852,81 @@ public class BookKeeperAdmin implements AutoCloseable {
      * @param ledgerFragmentMcb
      *            - MultiCallback to invoke once we've recovered the current
      *            ledger fragment.
-     * @param newBookie
-     *            - New bookie we want to use to recover and replicate the
+     * @param newBookies
+     *            - New bookies we want to use to recover and replicate the
      *            ledger entries that were stored on the failed bookie.
      */
     private void asyncRecoverLedgerFragment(final LedgerHandle lh,
             final LedgerFragment ledgerFragment,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final BookieSocketAddress newBookie)
-            throws InterruptedException {
-        lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
+            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+        lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
+    }
+
+    private Map<Integer, BookieSocketAddress> getReplacementBookies(
+                LedgerHandle lh,
+                List<BookieSocketAddress> ensemble,
+                Set<BookieSocketAddress> bookiesToRereplicate)
+            throws BKException.BKNotEnoughBookiesException {
+        Set<Integer> bookieIndexesToRereplicate = Sets.newHashSet();
+        for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
+            BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
+            if (bookiesToRereplicate.contains(bookieInEnsemble)) {
+                bookieIndexesToRereplicate.add(bookieIndex);
+            }
+        }
+        return getReplacementBookiesByIndexes(
+                lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
+    }
+
+    private Map<Integer, BookieSocketAddress> getReplacementBookiesByIndexes(
+                LedgerHandle lh,
+                List<BookieSocketAddress> ensemble,
+                Set<Integer> bookieIndexesToRereplicate,
+                Optional<Set<BookieSocketAddress>> excludedBookies)
+            throws BKException.BKNotEnoughBookiesException {
+        // target bookies to replicate
+        Map<Integer, BookieSocketAddress> targetBookieAddresses =
+                Maps.newHashMapWithExpectedSize(bookieIndexesToRereplicate.size());
+        // bookies to exclude for ensemble allocation
+        Set<BookieSocketAddress> bookiesToExclude = Sets.newHashSet();
+        if (excludedBookies.isPresent()) {
+            bookiesToExclude.addAll(excludedBookies.get());
+        }
+
+        // excluding bookies that need to be replicated
+        for (Integer bookieIndex : bookieIndexesToRereplicate) {
+            BookieSocketAddress bookie = ensemble.get(bookieIndex);
+            bookiesToExclude.add(bookie);
+        }
+
+        // allocate bookies
+        for (Integer bookieIndex : bookieIndexesToRereplicate) {
+            BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
+            BookieSocketAddress newBookie =
+                    bkc.getPlacementPolicy().replaceBookie(
+                            lh.getLedgerMetadata().getEnsembleSize(),
+                            lh.getLedgerMetadata().getWriteQuorumSize(),
+                            lh.getLedgerMetadata().getAckQuorumSize(),
+                            lh.getLedgerMetadata().getCustomMetadata(),
+                            ensemble,
+                            oldBookie,
+                            bookiesToExclude);
+            targetBookieAddresses.put(bookieIndex, newBookie);
+            bookiesToExclude.add(newBookie);
+        }
+
+        return targetBookieAddresses;
+    }
+
+    private ArrayList<BookieSocketAddress> replaceBookiesInEnsemble(
+            List<BookieSocketAddress> ensemble,
+            Map<Integer, BookieSocketAddress> replacedBookies) {
+        ArrayList<BookieSocketAddress> newEnsemble = Lists.newArrayList(ensemble);
+        for (Map.Entry<Integer, BookieSocketAddress> entry : replacedBookies.entrySet()) {
+            newEnsemble.set(entry.getKey(), entry.getValue());
+        }
+        return newEnsemble;
     }
 
     /**
@@ -882,20 +936,32 @@ public class BookKeeperAdmin implements AutoCloseable {
      *            - ledgerHandle
      * @param ledgerFragment
      *            - LedgerFragment to replicate
-     * @param targetBookieAddress
-     *            - target Bookie, to where entries should be replicated.
      */
     public void replicateLedgerFragment(LedgerHandle lh,
+            final LedgerFragment ledgerFragment)
+            throws InterruptedException, BKException {
+        Optional<Set<BookieSocketAddress>> excludedBookies = Optional.empty();
+        Map<Integer, BookieSocketAddress> targetBookieAddresses =
+                getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
+                        ledgerFragment.getBookiesIndexes(), excludedBookies);
+        replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
+    }
+
+    private void replicateLedgerFragment(LedgerHandle lh,
             final LedgerFragment ledgerFragment,
-            final BookieSocketAddress targetBookieAddress)
+            final Map<Integer, BookieSocketAddress> targetBookieAddresses)
             throws InterruptedException, BKException {
         CompletableFuture<Void> result = new CompletableFuture<>();
         ResultCallBack resultCallBack = new ResultCallBack(result);
-        SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
-                lh, ledgerFragment.getFirstEntryId(), ledgerFragment
-                        .getAddress(), targetBookieAddress);
+        SingleFragmentCallback cb = new SingleFragmentCallback(
+            resultCallBack,
+            lh,
+            ledgerFragment.getFirstEntryId(),
+            getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
 
-        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
+        Set<BookieSocketAddress> targetBookieSet = Sets.newHashSet();
+        targetBookieSet.addAll(targetBookieAddresses.values());
+        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet);
 
         try {
             SyncCallbackUtils.waitForResult(result);
@@ -903,6 +969,52 @@ public class BookKeeperAdmin implements AutoCloseable {
             throw BKException.create(bkc.getReturnRc(err.getCode()));
         }
     }
+	
+	private static Map<BookieSocketAddress, BookieSocketAddress> getReplacementBookiesMap(
+            ArrayList<BookieSocketAddress> ensemble,
+            Map<Integer, BookieSocketAddress> targetBookieAddresses) {
+        Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
+                new HashMap<BookieSocketAddress, BookieSocketAddress>();
+        for (Map.Entry<Integer, BookieSocketAddress> entry : targetBookieAddresses.entrySet()) {
+            BookieSocketAddress oldBookie = ensemble.get(entry.getKey());
+            BookieSocketAddress newBookie = entry.getValue();
+            bookiesMap.put(oldBookie, newBookie);
+        }
+        return bookiesMap;
+    }
+
+    private static Map<BookieSocketAddress, BookieSocketAddress> getReplacementBookiesMap(
+            LedgerFragment ledgerFragment,
+            Map<Integer, BookieSocketAddress> targetBookieAddresses) {
+        Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
+                new HashMap<BookieSocketAddress, BookieSocketAddress>();
+        for (Integer bookieIndex : ledgerFragment.getBookiesIndexes()) {
+            BookieSocketAddress oldBookie = ledgerFragment.getAddress(bookieIndex);
+            BookieSocketAddress newBookie = targetBookieAddresses.get(bookieIndex);
+            bookiesMap.put(oldBookie, newBookie);
+        }
+        return bookiesMap;
+    }
+
+    private static boolean containBookiesInLastEnsemble(LedgerMetadata lm,
+                                                        Set<BookieSocketAddress> bookies) {
+        if (lm.getEnsembles().size() <= 0) {
+            return false;
+        }
+        Long lastKey = lm.getEnsembles().lastKey();
+        ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+        return containBookies(lastEnsemble, bookies);
+    }
+
+    private static boolean containBookies(ArrayList<BookieSocketAddress> ensemble,
+                                          Set<BookieSocketAddress> bookies) {
+        for (BookieSocketAddress bookie : ensemble) {
+            if (bookies.contains(bookie)) {
+                return true;
+            }
+        }
+        return false;
+    }
 
     /** This is the class for getting the replication result */
     static class ResultCallBack implements AsyncCallback.VoidCallback {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index a90a366..2ad8076 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -20,15 +20,14 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -81,39 +80,125 @@ public class LedgerChecker {
         }
     }
 
+    /**
+     * This will collect the bad bookies inside a ledger fragment.
+     */
+    private static class LedgerFragmentCallback implements GenericCallback<LedgerFragment> {
+
+        private final LedgerFragment fragment;
+        private final int bookieIndex;
+        // bookie index -> return code
+        private final Map<Integer, Integer> badBookies;
+        private final AtomicInteger numBookies;
+        private final GenericCallback<LedgerFragment> cb;
+
+        LedgerFragmentCallback(LedgerFragment lf,
+                               int bookieIndex,
+                               GenericCallback<LedgerFragment> cb,
+                               Map<Integer, Integer> badBookies,
+                               AtomicInteger numBookies) {
+            this.fragment = lf;
+            this.bookieIndex = bookieIndex;
+            this.cb = cb;
+            this.badBookies = badBookies;
+            this.numBookies = numBookies;
+        }
+
+        @Override
+        public void operationComplete(int rc, LedgerFragment lf) {
+            if (BKException.Code.OK != rc) {
+                synchronized (badBookies) {
+                    badBookies.put(bookieIndex, rc);
+                }
+            }
+            if (numBookies.decrementAndGet() == 0) {
+                if (badBookies.isEmpty()) {
+                    cb.operationComplete(BKException.Code.OK, fragment);
+                } else {
+                    int rcToReturn = BKException.Code.NoBookieAvailableException;
+                    for (Map.Entry<Integer, Integer> entry : badBookies.entrySet()) {
+                        rcToReturn = entry.getValue();
+                        if (entry.getValue() == BKException.Code.ClientClosedException) {
+                            break;
+                        }
+                    }
+                    cb.operationComplete(rcToReturn,
+                            fragment.subset(badBookies.keySet()));
+                }
+            }
+        }
+    }
+
     public LedgerChecker(BookKeeper bkc) {
         bookieClient = bkc.getBookieClient();
     }
 
+    /**
+     * Verify a ledger fragment to collect bad bookies
+     *
+     * @param fragment
+     *          fragment to verify
+     * @param cb
+     *          callback
+     * @throws InvalidFragmentException
+     */
+    private void verifyLedgerFragment(LedgerFragment fragment,
+                                      GenericCallback<LedgerFragment> cb)
+            throws InvalidFragmentException, BKException {
+        Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
+        if (bookiesToCheck.isEmpty()) {
+            cb.operationComplete(BKException.Code.OK, fragment);
+            return;
+        }
+
+        AtomicInteger numBookies = new AtomicInteger(bookiesToCheck.size());
+        Map<Integer, Integer> badBookies = new HashMap<Integer, Integer>();
+        for (Integer bookieIndex : bookiesToCheck) {
+            LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
+                    fragment, bookieIndex, cb, badBookies, numBookies);
+            verifyLedgerFragment(fragment, bookieIndex, lfCb);
+        }
+    }
+
+    /**
+     * Verify a bookie inside a ledger fragment.
+     *
+     * @param fragment
+     *          ledger fragment
+     * @param bookieIndex
+     *          bookie index in the fragment
+     * @param cb
+     *          callback
+     * @throws InvalidFragmentException
+     */
     private void verifyLedgerFragment(LedgerFragment fragment,
-            GenericCallback<LedgerFragment> cb) throws InvalidFragmentException {
-        long firstStored = fragment.getFirstStoredEntryId();
-        long lastStored = fragment.getLastStoredEntryId();
-
-        // because of this if block, even if the bookie of the fragment is 
-        // down, it considers Fragment is available/not-bad if firstStored
-        // and lastStored are LedgerHandle.INVALID_ENTRY_ID.
-        // So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
-        // if any change is made here, then the changes should be in BookieShell also
+                                      int bookieIndex,
+                                      GenericCallback<LedgerFragment> cb)
+            throws InvalidFragmentException {
+        long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
+        long lastStored = fragment.getLastStoredEntryId(bookieIndex);
+
+        BookieSocketAddress bookie = fragment.getAddress(bookieIndex);
+        if (null == bookie) {
+            throw new InvalidFragmentException();
+        }
+
         if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
+            // this fragment is not on this bookie
             if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
                 throw new InvalidFragmentException();
             }
             cb.operationComplete(BKException.Code.OK, fragment);
-            return;
-        }
-        if (firstStored == lastStored) {
+        } else if (firstStored == lastStored) {
             ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
                     fragment, cb);
-            bookieClient.readEntry(fragment.getAddress(), fragment
+            bookieClient.readEntry(bookie, fragment
                     .getLedgerId(), firstStored, manycb, null);
         } else {
             ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
                     fragment, cb);
-            bookieClient.readEntry(fragment.getAddress(), fragment
-                    .getLedgerId(), firstStored, manycb, null);
-            bookieClient.readEntry(fragment.getAddress(), fragment
-                    .getLedgerId(), lastStored, manycb, null);
+            bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored, manycb, null);
+            bookieClient.readEntry(bookie, fragment.getLedgerId(), lastStored, manycb, null);
         }
     }
 
@@ -191,44 +276,49 @@ public class LedgerChecker {
         for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
                 .getLedgerMetadata().getEnsembles().entrySet()) {
             if (curEntryId != null) {
+                Set<Integer> bookieIndexes = new HashSet<Integer>();
                 for (int i = 0; i < curEnsemble.size(); i++) {
-                    fragments.add(new LedgerFragment(lh, curEntryId,
-                            e.getKey() - 1, i));
+                    bookieIndexes.add(i);
                 }
+                fragments.add(new LedgerFragment(lh, curEntryId,
+                        e.getKey() - 1, bookieIndexes));
             }
             curEntryId = e.getKey();
             curEnsemble = e.getValue();
         }
 
-        /* Checking the last fragment of the ledger can be complicated in some cases.
+
+
+
+        /* Checking the last segment of the ledger can be complicated in some cases.
          * In the case that the ledger is closed, we can just check the fragments of
-         * the ledger as normal, except in the case that no entry was ever written,
-         * to the ledger, in which case we check no fragments.
+         * the segment as normal even if no data has ever been written to.
          * In the case that the ledger is open, but enough entries have been written,
-         * for lastAddConfirmed to be set above the start entry of the fragment, we
+         * for lastAddConfirmed to be set above the start entry of the segment, we
          * can also check as normal.
-         * However, if lastAddConfirmed cannot be trusted, such as when it's lower than
-         * the first entry id, or not set at all, we cannot be sure if there has been
-         * data written to the fragment. For this reason, we have to send a read request
+         * However, if ledger is open, sometimes lastAddConfirmed cannot be trusted,
+         * such as when it's lower than the first entry id, or not set at all,
+         * we cannot be sure if there has been data written to the segment.
+         * For this reason, we have to send a read request
          * to the bookies which should have the first entry. If they respond with
          * NoSuchEntry we can assume it was never written. If they respond with anything
          * else, we must assume the entry has been written, so we run the check.
          */
-        if (curEntryId != null && !(lh.getLedgerMetadata().isClosed() && lh.getLastAddConfirmed() < curEntryId)) {
+        if (curEntryId != null) {
             long lastEntry = lh.getLastAddConfirmed();
 
-            if (lastEntry < curEntryId) {
+            if (!lh.isClosed() && lastEntry < curEntryId) {
                 lastEntry = curEntryId;
             }
 
-            final Set<LedgerFragment> finalFragments = new HashSet<LedgerFragment>();
+            Set<Integer> bookieIndexes = new HashSet<Integer>();
             for (int i = 0; i < curEnsemble.size(); i++) {
-                finalFragments.add(new LedgerFragment(lh, curEntryId,
-                        lastEntry, i));
+                bookieIndexes.add(i);
             }
+            final LedgerFragment lastLedgerFragment = new LedgerFragment(lh, curEntryId,
+                    lastEntry, bookieIndexes);
 
-            // Check for the case that no last confirmed entry has
-            // been set.
+            // Check for the case that no last confirmed entry has been set
             if (curEntryId == lastEntry) {
                 final long entryToRead = curEntryId;
 
@@ -237,7 +327,7 @@ public class LedgerChecker {
                                               new GenericCallback<Boolean>() {
                                                   public void operationComplete(int rc, Boolean result) {
                                                       if (result) {
-                                                          fragments.addAll(finalFragments);
+                                                          fragments.add(lastLedgerFragment);
                                                       }
                                                       checkFragments(fragments, cb);
                                                   }
@@ -250,7 +340,7 @@ public class LedgerChecker {
                 }
                 return;
             } else {
-                fragments.addAll(finalFragments);
+                fragments.add(lastLedgerFragment);
             }
         }
 
@@ -275,7 +365,10 @@ public class LedgerChecker {
                 LOG.error("Invalid fragment found : {}", r);
                 allFragmentsCb.operationComplete(
                         BKException.Code.IncorrectParameterException, r);
+            } catch (BKException e) {
+                LOG.error("BKException when checking fragment : {}", r, e);
             }
         }
     }
+
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index e12f77f..3f02355 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -20,19 +20,20 @@
 package org.apache.bookkeeper.client;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.SortedMap;
-
 import org.apache.bookkeeper.net.BookieSocketAddress;
 
 /**
- * Represents the entries of a segment of a ledger which are stored on a single
- * bookie in the segments bookie ensemble.
- * 
+ * Represents the entries of a segment of a ledger which are stored on subset of
+ * bookies in the segments bookie ensemble.
+ *
  * Used for checking and recovery
  */
 public class LedgerFragment {
-    private final int bookieIndex;
+    private final Set<Integer> bookieIndexes;
     private final List<BookieSocketAddress> ensemble;
     private final long firstEntryId;
     private final long lastKnownEntryId;
@@ -40,12 +41,14 @@ public class LedgerFragment {
     private final DistributionSchedule schedule;
     private final boolean isLedgerClosed;
 
-    LedgerFragment(LedgerHandle lh, long firstEntryId,
-            long lastKnownEntryId, int bookieIndex) {
+    LedgerFragment(LedgerHandle lh,
+                   long firstEntryId,
+                   long lastKnownEntryId,
+                   Set<Integer> bookieIndexes) {
         this.ledgerId = lh.getId();
         this.firstEntryId = firstEntryId;
         this.lastKnownEntryId = lastKnownEntryId;
-        this.bookieIndex = bookieIndex;
+        this.bookieIndexes = bookieIndexes;
         this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
         this.schedule = lh.getDistributionSchedule();
         SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh
@@ -54,6 +57,27 @@ public class LedgerFragment {
                 || !ensemble.equals(ensembles.get(ensembles.lastKey()));
     }
 
+    LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
+        this.ledgerId = lf.ledgerId;
+        this.firstEntryId = lf.firstEntryId;
+        this.lastKnownEntryId = lf.lastKnownEntryId;
+        this.bookieIndexes = subset;
+        this.ensemble = lf.ensemble;
+        this.schedule = lf.schedule;
+        this.isLedgerClosed = lf.isLedgerClosed;
+    }
+
+    /**
+     * Return a ledger fragment contains subset of bookies.
+     *
+     * @param subset
+     *          subset of bookies.
+     * @return ledger fragment contains subset of bookies
+     */
+    public LedgerFragment subset(Set<Integer> subset) {
+        return new LedgerFragment(this, subset);
+    }
+
     /**
      * Returns true, if and only if the ledger fragment will never be modified
      * by any of the clients in future, otherwise false. i.e,
@@ -83,23 +107,51 @@ public class LedgerFragment {
     /**
      * Gets the failedBookie address
      */
-    public BookieSocketAddress getAddress() {
+    public BookieSocketAddress getAddress(int bookieIndex) {
         return ensemble.get(bookieIndex);
     }
-    
+
+    public Set<BookieSocketAddress> getAddresses() {
+        Set<BookieSocketAddress> addresses = new HashSet<BookieSocketAddress>();
+        for (int bookieIndex : bookieIndexes) {
+            addresses.add(ensemble.get(bookieIndex));
+        }
+        return addresses;
+    }
+
     /**
      * Gets the failedBookie index
      */
-    public int getBookiesIndex() {
-        return bookieIndex;
+    public Set<Integer> getBookiesIndexes() {
+        return bookieIndexes;
     }
 
     /**
-     * Gets the first stored entry id of the fragment in failed bookie.
-     * 
+     * Gets the first stored entry id of the fragment in failed bookies.
+     *
      * @return entryId
      */
     public long getFirstStoredEntryId() {
+        Long firstEntry = null;
+        for (int bookieIndex : bookieIndexes) {
+            Long firstStoredEntryForBookie = getFirstStoredEntryId(bookieIndex);
+            if (null == firstEntry) {
+                firstEntry = firstStoredEntryForBookie;
+            } else if (null != firstStoredEntryForBookie) {
+                firstEntry = Math.min(firstEntry, firstStoredEntryForBookie);
+            }
+        }
+        return null == firstEntry ? LedgerHandle.INVALID_ENTRY_ID : firstEntry;
+    }
+
+    /**
+     * Get the first stored entry id of the fragment in the given failed bookies.
+     *
+     * @param bookieIndex
+     *          the bookie index in the ensemble.
+     * @return first stored entry id on the bookie.
+     */
+    public Long getFirstStoredEntryId(int bookieIndex) {
         long firstEntry = firstEntryId;
 
         for (int i = 0; i < ensemble.size() && firstEntry <= lastKnownEntryId; i++) {
@@ -114,10 +166,30 @@ public class LedgerFragment {
 
     /**
      * Gets the last stored entry id of the fragment in failed bookie.
-     * 
+     *
      * @return entryId
      */
     public long getLastStoredEntryId() {
+        Long lastEntry = null;
+        for (int bookieIndex : bookieIndexes) {
+            Long lastStoredEntryIdForBookie = getLastStoredEntryId(bookieIndex);
+            if (null == lastEntry) {
+                lastEntry = lastStoredEntryIdForBookie;
+            } else if (null != lastStoredEntryIdForBookie) {
+                lastEntry = Math.max(lastEntry, lastStoredEntryIdForBookie);
+            }
+        }
+        return null == lastEntry ? LedgerHandle.INVALID_ENTRY_ID : lastEntry;
+    }
+
+    /**
+     * Get the last stored entry id of the fragment in the given failed bookie.
+     *
+     * @param bookieIndex
+     *          the bookie index in the ensemble.
+     * @return first stored entry id on the bookie.
+     */
+    public Long getLastStoredEntryId(int bookieIndex) {
         long lastEntry = lastKnownEntryId;
         for (int i = 0; i < ensemble.size() && lastEntry >= firstEntryId; i++) {
             if (schedule.hasEntry(lastEntry, bookieIndex)) {
@@ -131,7 +203,7 @@ public class LedgerFragment {
 
     /**
      * Gets the ensemble of fragment
-     * 
+     *
      * @return the ensemble for the segment which this fragment is a part of
      */
     public List<BookieSocketAddress> getEnsemble() {
@@ -143,6 +215,6 @@ public class LedgerFragment {
         return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
                 + "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
                 getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
-                getAddress(), isLedgerClosed);
+                getAddresses(), isLedgerClosed);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 172f9ec..95c48d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -19,17 +19,20 @@
  */
 package org.apache.bookkeeper.client;
 
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -79,7 +82,7 @@ public class LedgerFragmentReplicator {
     private void replicateFragmentInternal(final LedgerHandle lh,
             final LedgerFragment lf,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final BookieSocketAddress newBookie) throws InterruptedException {
+            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
         if (!lf.isClosed()) {
             LOG.error("Trying to replicate an unclosed fragment;"
                       + " This is not safe {}", lf);
@@ -94,13 +97,13 @@ public class LedgerFragmentReplicator {
              * Ideally this should never happen if bookie failure is taken care
              * of properly. Nothing we can do though in this case.
              */
-            LOG.warn("Dead bookie (" + lf.getAddress()
+            LOG.warn("Dead bookie (" + lf.getAddresses()
                     + ") is still part of the current"
                     + " active ensemble for ledgerId: " + lh.getId());
             ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
             return;
         }
-        if (startEntryId > endEntryId) {
+        if (startEntryId > endEntryId || endEntryId <= INVALID_ENTRY_ID) {
             // for open ledger which there is no entry, the start entry id is 0,
             // the end entry id is -1.
             // we can return immediately to trigger forward read
@@ -126,7 +129,7 @@ public class LedgerFragmentReplicator {
                 BKException.Code.LedgerRecoveryException);
         for (final Long entryId : entriesToReplicate) {
             recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
-                    newBookie);
+                    newBookies);
         }
     }
 
@@ -146,27 +149,27 @@ public class LedgerFragmentReplicator {
      * @param ledgerFragmentMcb
      *            MultiCallback to invoke once we've recovered the current
      *            ledger fragment.
-     * @param targetBookieAddress
-     *            New bookie we want to use to recover and replicate the ledger
+     * @param targetBookieAddresses
+     *            New bookies we want to use to recover and replicate the ledger
      *            entries that were stored on the failed bookie.
      */
     void replicate(final LedgerHandle lh, final LedgerFragment lf,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final BookieSocketAddress targetBookieAddress)
+            final Set<BookieSocketAddress> targetBookieAddresses)
             throws InterruptedException {
         Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
                 bkc.getConf().getRereplicationEntryBatchSize());
-        LOG.info("Fragment :" + lf + " is split into sub fragments :"
-                + partionedFragments);
+        LOG.info("Replicating fragment {} in {} sub fragments.",
+                lf, partionedFragments.size());
         replicateNextBatch(lh, partionedFragments.iterator(),
-                ledgerFragmentMcb, targetBookieAddress);
+                ledgerFragmentMcb, targetBookieAddresses);
     }
 
     /** Replicate the batched entry fragments one after other */
     private void replicateNextBatch(final LedgerHandle lh,
             final Iterator<LedgerFragment> fragments,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final BookieSocketAddress targetBookieAddress) {
+            final Set<BookieSocketAddress> targetBookieAddresses) {
         if (fragments.hasNext()) {
             try {
                 replicateFragmentInternal(lh, fragments.next(),
@@ -179,11 +182,11 @@ public class LedgerFragmentReplicator {
                                 } else {
                                     replicateNextBatch(lh, fragments,
                                             ledgerFragmentMcb,
-                                            targetBookieAddress);
+                                            targetBookieAddresses);
                                 }
                             }
 
-                        }, targetBookieAddress);
+                        }, targetBookieAddresses);
             } catch (InterruptedException e) {
                 ledgerFragmentMcb.processResult(
                         BKException.Code.InterruptedException, null, null);
@@ -224,7 +227,7 @@ public class LedgerFragmentReplicator {
         for (int i = 0; i < splitsWithFullEntries; i++) {
             fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1;
             fragments.add(new LedgerFragment(lh, firstEntryId,
-                    fragmentSplitLastEntry, ledgerFragment.getBookiesIndex()));
+                    fragmentSplitLastEntry, ledgerFragment.getBookiesIndexes()));
             firstEntryId = fragmentSplitLastEntry + 1;
         }
 
@@ -233,7 +236,7 @@ public class LedgerFragmentReplicator {
         if (lastSplitWithPartialEntries > 0) {
             fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId
                     + lastSplitWithPartialEntries - 1, ledgerFragment
-                    .getBookiesIndex()));
+                    .getBookiesIndexes()));
         }
         return fragments;
     }
@@ -242,7 +245,7 @@ public class LedgerFragmentReplicator {
      * This method asynchronously recovers a specific ledger entry by reading
      * the values via the BookKeeper Client (which would read it from the other
      * replicas) and then writing it to the chosen new bookie.
-     * 
+     *
      * @param entryId
      *            Ledger Entry ID to recover.
      * @param lh
@@ -250,14 +253,41 @@ public class LedgerFragmentReplicator {
      * @param ledgerFragmentEntryMcb
      *            MultiCallback to invoke once we've recovered the current
      *            ledger entry.
-     * @param newBookie
-     *            New bookie we want to use to recover and replicate the ledger
+     * @param newBookies
+     *            New bookies we want to use to recover and replicate the ledger
      *            entries that were stored on the failed bookie.
      */
     private void recoverLedgerFragmentEntry(final Long entryId,
             final LedgerHandle lh,
             final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
-            final BookieSocketAddress newBookie) throws InterruptedException {
+            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+        final AtomicInteger numCompleted = new AtomicInteger(0);
+        final AtomicBoolean completed = new AtomicBoolean(false);
+        final WriteCallback multiWriteCallback = new WriteCallback() {
+            @Override
+            public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+                if (rc != BKException.Code.OK) {
+                    LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
+                              new Object[] { ledgerId, entryId, addr, BKException.create(rc) });
+                    if (completed.compareAndSet(false, true)) {
+                        ledgerFragmentEntryMcb.processResult(rc, null, null);
+                    }
+                } else {
+                    numEntriesWritten.inc();
+                    if (ctx instanceof Long) {
+                        numBytesWritten.registerSuccessfulValue((Long) ctx);
+                    }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
+                                  new Object[] { ledgerId, entryId, addr });
+                    }
+                    if (numCompleted.incrementAndGet() == newBookies.size() &&
+                        completed.compareAndSet(false, true)) {
+                        ledgerFragmentEntryMcb.processResult(rc, null, null);
+                    }
+                }
+            }
+        };
         /*
          * Read the ledger entry using the LedgerHandle. This will allow us to
          * read the entry from one of the other replicated bookies other than
@@ -286,42 +316,16 @@ public class LedgerFragmentReplicator {
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
                                 Unpooled.wrappedBuffer(data, 0, data.length));
-                bkc.getBookieClient().addEntry(newBookie, lh.getId(),
-                        lh.getLedgerKey(), entryId, toSend,
-                        new WriteCallback() {
-                            @Override
-                            public void writeComplete(int rc, long ledgerId,
-                                    long entryId, BookieSocketAddress addr,
-                                    Object ctx) {
-                                if (rc != BKException.Code.OK) {
-                                    LOG.error(
-                                            "BK error writing entry for ledgerId: "
-                                                    + ledgerId + ", entryId: "
-                                                    + entryId + ", bookie: "
-                                                    + addr, BKException
-                                                    .create(rc));
-                                } else {
-                                    numEntriesWritten.inc();
-                                    numBytesWritten.registerSuccessfulValue(dataLength);
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("Success writing ledger id "
-                                                + ledgerId + ", entry id "
-                                                + entryId + " to a new bookie "
-                                                + addr + "!");
-                                    }
-                                }
-                                /*
-                                 * Pass the return code result up the chain with
-                                 * the parent callback.
-                                 */
-                                ledgerFragmentEntryMcb.processResult(rc, null,
-                                        null);
-                            }
-                        }, null, BookieProtocol.FLAG_RECOVERY_ADD);
+                for (BookieSocketAddress newBookie : newBookies) {
+                    bkc.getBookieClient().addEntry(newBookie, lh.getId(),
+                            lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+                            multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD);
+                }
+                toSend.release();
             }
         }, null);
     }
-    
+
     /**
      * Callback for recovery of a single ledger fragment. Once the fragment has
      * had all entries replicated, update the ensemble in zookeeper. Once
@@ -332,17 +336,15 @@ public class LedgerFragmentReplicator {
         final AsyncCallback.VoidCallback ledgerFragmentsMcb;
         final LedgerHandle lh;
         final long fragmentStartId;
-        final BookieSocketAddress oldBookie;
-        final BookieSocketAddress newBookie;
+        final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
 
         SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
                 LedgerHandle lh, long fragmentStartId,
-                BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
+                Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
             this.ledgerFragmentsMcb = ledgerFragmentsMcb;
             this.lh = lh;
             this.fragmentStartId = fragmentStartId;
-            this.newBookie = newBookie;
-            this.oldBookie = oldBookie;
+            this.oldBookie2NewBookie = oldBookie2NewBookie;
         }
 
         @Override
@@ -353,39 +355,32 @@ public class LedgerFragmentReplicator {
                 ledgerFragmentsMcb.processResult(rc, null, null);
                 return;
             }
-            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
-                                        oldBookie, newBookie);
+            updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, oldBookie2NewBookie);
         }
     }
 
     /** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
     private static void updateEnsembleInfo(
             AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
-            LedgerHandle lh, BookieSocketAddress oldBookie,
-            BookieSocketAddress newBookie) {
+            LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
         /*
          * Update the ledger metadata's ensemble info to point to the new
          * bookie.
          */
         ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
                 .getEnsembles().get(fragmentStartId);
-        int deadBookieIndex = ensemble.indexOf(oldBookie);
-
-        /*
-         * An update to the ensemble info might happen after re-reading ledger metadata.
-         * Such an update might reflect a change to the ensemble membership such that 
-         * it might not be necessary to replace the bookie.
-         */
-        if (deadBookieIndex >= 0) {
-            ensemble.remove(deadBookieIndex);
-            ensemble.add(deadBookieIndex, newBookie);
-            lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
-                        fragmentStartId, lh, oldBookie, newBookie));
-        }
-        else {
-            LOG.warn("Bookie {} doesn't exist in ensemble {} anymore.", oldBookie, ensemble);
-            ensembleUpdatedCb.processResult(BKException.Code.UnexpectedConditionException, null, null);
+        for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry : oldBookie2NewBookie.entrySet()) {
+            int deadBookieIndex = ensemble.indexOf(entry.getKey());
+            // update ensemble info might happen after re-read ledger metadata, so the ensemble might already
+            // change. if ensemble is already changed, skip replacing the bookie doesn't exist.
+            if (deadBookieIndex >= 0) {
+                ensemble.set(deadBookieIndex, entry.getValue());
+            } else {
+                LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", entry.getKey(), ensemble);
+            }
         }
+        lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
+                fragmentStartId, lh, oldBookie2NewBookie));
     }
 
     /**
@@ -397,17 +392,15 @@ public class LedgerFragmentReplicator {
         final AsyncCallback.VoidCallback ensembleUpdatedCb;
         final LedgerHandle lh;
         final long fragmentStartId;
-        final BookieSocketAddress oldBookie;
-        final BookieSocketAddress newBookie;
+        final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
 
         public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
                 long fragmentStartId, LedgerHandle lh,
-                BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
+                Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
             this.ensembleUpdatedCb = ledgerFragmentsMcb;
             this.lh = lh;
             this.fragmentStartId = fragmentStartId;
-            this.newBookie = newBookie;
-            this.oldBookie = oldBookie;
+            this.oldBookie2NewBookie = oldBookie2NewBookie;
         }
 
         @Override
@@ -418,9 +411,8 @@ public class LedgerFragmentReplicator {
                 // try again, the previous success (with which this has
                 // conflicted) will have updated the stat other operations
                 // such as (addEnsemble) would update it too.
-                lh
-                        .rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
-                                lh.bk.getMainWorkerPool(), lh.getId()) {
+                lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+                                lh.bk.mainWorkerPool, lh.getId()) {
                             @Override
                             public void safeOperationComplete(int rc,
                                     LedgerMetadata newMeta) {
@@ -433,8 +425,7 @@ public class LedgerFragmentReplicator {
                                 } else {
                                     lh.metadata = newMeta;
                                     updateEnsembleInfo(ensembleUpdatedCb,
-                                            fragmentStartId, lh, oldBookie,
-                                            newBookie);
+                                            fragmentStartId, lh, oldBookie2NewBookie);
                                 }
                             }
                             @Override
@@ -449,8 +440,8 @@ public class LedgerFragmentReplicator {
             } else {
                 LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
                         + fragmentStartId
-                        + ") to point ledger fragments from old dead bookie: ("
-                        + oldBookie + ") to new bookie: (" + newBookie + ")");
+                        + ") to point ledger fragments from old bookies to new bookies: "
+                        + oldBookie2NewBookie);
             }
             /*
              * Pass the return code result up the chain with the parent
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 10fb329..787540e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -115,10 +115,13 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String DISK_USAGE_WARN_THRESHOLD = "diskUsageWarnThreshold";
     protected final static String DISK_USAGE_LWM_THRESHOLD = "diskUsageLwmThreshold";
     protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval";
+
+    // Replication parameters
     protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
     protected final static String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
     protected final static String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
     protected final static String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
+    protected final static String RW_REREPLICATE_BACKOFF_MS = "rwRereplicateBackoffMs";
 
     // Worker Thread parameters.
     protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
@@ -1760,6 +1763,24 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get how long to backoff when encountering exception on rereplicating a ledger.
+     *
+     * @return backoff time in milliseconds
+     */
+    public int getRwRereplicateBackoffMs() {
+        return getInt(RW_REREPLICATE_BACKOFF_MS, 5000);
+    }
+
+    /**
+     * Set how long to backoff when encountering exception on rereplicating a ledger.
+     *
+     * @param backoffMs backoff time in milliseconds
+     */
+    public void setRwRereplicateBackoffMs(int backoffMs) {
+        setProperty(RW_REREPLICATE_BACKOFF_MS, backoffMs);
+    }
+
+    /**
      * Sets that whether force start a bookie in readonly mode
      *
      * @param enabled
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index ab35b41..4f84d03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,6 +20,10 @@
  */
 package org.apache.bookkeeper.replication;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -33,7 +37,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -63,11 +66,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
-
 /**
  * Auditor is a single entity in the entire Bookie cluster and will be watching
  * all the bookies under 'ledgerrootpath/available' zkpath. When any of the
@@ -570,7 +568,7 @@ public class Auditor implements BookiesListener {
                 if (rc == BKException.Code.OK) {
                     Set<BookieSocketAddress> bookies = Sets.newHashSet();
                     for (LedgerFragment f : fragments) {
-                        bookies.add(f.getAddress());
+                        bookies.addAll(f.getAddresses());
                     }
                     for (BookieSocketAddress bookie : bookies) {
                         publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index 9c3c019..5caac6f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -20,14 +20,12 @@
  */
 package org.apache.bookkeeper.replication;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.HashSet;
 import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.ExitCode;
@@ -105,8 +103,7 @@ public class AutoRecoveryMain {
                 .build();
         auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
                 zk, statsLogger.scope(AUDITOR_SCOPE));
-        replicationWorker = new ReplicationWorker(zk, conf,
-                Bookie.getBookieAddress(conf), statsLogger.scope(REPLICATION_WORKER_SCOPE));
+        replicationWorker = new ReplicationWorker(zk, conf, statsLogger.scope(REPLICATION_WORKER_SCOPE));
         deathWatcher = new AutoRecoveryDeathWatcher(this);
     }
 
@@ -115,7 +112,7 @@ public class AutoRecoveryMain {
         this.conf = conf;
         this.zk = zk;
         auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk);
-        replicationWorker = new ReplicationWorker(zk, conf, Bookie.getBookieAddress(conf));
+        replicationWorker = new ReplicationWorker(zk, conf);
         deathWatcher = new AutoRecoveryDeathWatcher(this);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index fb7de20..0b48e33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -43,6 +43,7 @@ public interface ReplicationStats {
     public final static String NUM_BYTES_READ = "NUM_BYTES_READ";
     public final static String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
     public final static String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
+    public final static String REPLICATE_EXCEPTION = "exceptions";
 
     public final static String BK_CLIENT_SCOPE = "bk_client";
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 98960b0..61205f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -19,8 +19,10 @@
  */
 package org.apache.bookkeeper.replication;
 
+import com.google.common.base.Stopwatch;
 import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -29,13 +31,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
-
 import org.apache.bookkeeper.bookie.BookieThread;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.BKException.BKLedgerRecoveryException;
 import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.BKException.BKReadException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -61,6 +62,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
+import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
 import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
 
 /**
@@ -68,24 +71,25 @@ import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
  * ZKLedgerUnderreplicationManager and replicates to it.
  */
 public class ReplicationWorker implements Runnable {
-    private final static Logger LOG = LoggerFactory
+    private static final Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
-    final private LedgerUnderreplicationManager underreplicationManager;
+    private final LedgerUnderreplicationManager underreplicationManager;
     private final ServerConfiguration conf;
     private final ZooKeeper zkc;
     private volatile boolean workerRunning = false;
-    private volatile boolean isInReadOnlyMode = false;
-    final private BookKeeperAdmin admin;
+    private final BookKeeperAdmin admin;
     private final LedgerChecker ledgerChecker;
-    private final BookieSocketAddress targetBookie;
     private final BookKeeper bkc;
     private final Thread workerThread;
+    private final long rwRereplicateBackoffMs;
     private final long openLedgerRereplicationGracePeriod;
     private final Timer pendingReplicationTimer;
 
     // Expose Stats
+    private final StatsLogger statsLogger;
     private final OpStatsLogger rereplicateOpStats;
     private final Counter numLedgersReplicated;
+    private final Map<String,Counter> exceptionCounters;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -96,15 +100,12 @@ public class ReplicationWorker implements Runnable {
      *            - ZK instance
      * @param conf
      *            - configurations
-     * @param targetBKAddr
-     *            - to where replication should happen. Ideally this will be
-     *            local Bookie address.
      */
     public ReplicationWorker(final ZooKeeper zkc,
-                             final ServerConfiguration conf, BookieSocketAddress targetBKAddr)
+                             final ServerConfiguration conf)
             throws CompatibilityException, KeeperException,
             InterruptedException, IOException {
-        this(zkc, conf, targetBKAddr, NullStatsLogger.INSTANCE);
+        this(zkc, conf, NullStatsLogger.INSTANCE);
     }
 
     /**
@@ -116,18 +117,16 @@ public class ReplicationWorker implements Runnable {
      *            - ZK instance
      * @param conf
      *            - configurations
-     * @param targetBKAddr
-     *            - to where replication should happen. Ideally this will be
-     *            local Bookie address.
+     * @param statsLogger
+     *            - stats logger
      */
     public ReplicationWorker(final ZooKeeper zkc,
-                             final ServerConfiguration conf, BookieSocketAddress targetBKAddr,
+                             final ServerConfiguration conf,
                              StatsLogger statsLogger)
             throws CompatibilityException, KeeperException,
             InterruptedException, IOException {
         this.zkc = zkc;
         this.conf = conf;
-        this.targetBookie = targetBKAddr;
         LedgerManagerFactory mFactory = LedgerManagerFactory
                 .newLedgerManagerFactory(this.conf, this.zkc);
         this.underreplicationManager = mFactory
@@ -141,11 +140,14 @@ public class ReplicationWorker implements Runnable {
         this.workerThread = new BookieThread(this, "ReplicationWorker");
         this.openLedgerRereplicationGracePeriod = conf
                 .getOpenLedgerRereplicationGracePeriod();
+        this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
         this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
 
         // Expose Stats
-        this.rereplicateOpStats = statsLogger.getOpStatsLogger(REREPLICATE_OP);
-        this.numLedgersReplicated = statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+        this.statsLogger = statsLogger;
+        this.rereplicateOpStats = this.statsLogger.getOpStatsLogger(REREPLICATE_OP);
+        this.numLedgersReplicated = this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+        this.exceptionCounters = new HashMap<String, Counter>();
     }
 
     /** Start the replication worker */
@@ -167,38 +169,23 @@ public class ReplicationWorker implements Runnable {
                 return;
             } catch (BKException e) {
                 LOG.error("BKException while replicating fragments", e);
-                if (e instanceof BKException.BKWriteOnReadOnlyBookieException) {
-                    waitTillTargetBookieIsWritable();
-                } else {
-                    waitBackOffTime();
-                }
+                waitBackOffTime(rwRereplicateBackoffMs);
             } catch (UnavailableException e) {
                 LOG.error("UnavailableException "
                         + "while replicating fragments", e);
-                waitBackOffTime();
+                waitBackOffTime(rwRereplicateBackoffMs);
             }
         }
         LOG.info("ReplicationWorker exited loop!");
     }
 
-    private static void waitBackOffTime() {
+    private static void waitBackOffTime(long backoffMs) {
         try {
-            Thread.sleep(5000);
+            Thread.sleep(backoffMs);
         } catch (InterruptedException e) {
         }
     }
 
-    private void waitTillTargetBookieIsWritable() {
-        LOG.info("Waiting for target bookie {} to be back in read/write mode", targetBookie);
-        while (workerRunning && admin.getReadOnlyBookiesAsync().contains(targetBookie)) {
-            isInReadOnlyMode = true;
-            waitBackOffTime();
-        }
-
-        isInReadOnlyMode = false;
-        LOG.info("Target bookie {} is back in read/write mode", targetBookie);
-    }
-
     /**
      * Replicates the under replicated fragments from failed bookie ledger to
      * targetBookie
@@ -229,39 +216,22 @@ public class ReplicationWorker implements Runnable {
             LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
         }
         try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
-            Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+            Set<LedgerFragment> fragmentsBeforeReplicate = getUnderreplicatedFragments(lh);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
+                LOG.debug("Founds fragments {} for replication from ledger: {}",
+                    fragmentsBeforeReplicate, ledgerIdToReplicate);
             }
 
             boolean foundOpenFragments = false;
             long numFragsReplicated = 0;
-            for (LedgerFragment ledgerFragment : fragments) {
+            for (LedgerFragment ledgerFragment : fragmentsBeforeReplicate) {
                 if (!ledgerFragment.isClosed()) {
                     foundOpenFragments = true;
                     continue;
-                } else if (isTargetBookieExistsInFragmentEnsemble(lh,
-                        ledgerFragment)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
-                                ledgerFragment.getEnsemble());
-                    }
-                    continue;
-                }
-                try {
-                    admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
-                    numFragsReplicated++;
-                } catch (BKException.BKBookieHandleNotAvailableException e) {
-                    LOG.warn("BKBookieHandleNotAvailableException "
-                            + "while replicating the fragment", e);
-                } catch (BKException.BKLedgerRecoveryException e) {
-                    LOG.warn("BKLedgerRecoveryException "
-                            + "while replicating the fragment", e);
-                    if (admin.getReadOnlyBookiesAsync().contains(targetBookie)) {
-                        underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
-                        throw new BKException.BKWriteOnReadOnlyBookieException();
-                    }
                 }
+                LOG.info("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
+                admin.replicateLedgerFragment(lh, ledgerFragment);
+                numFragsReplicated++;
             }
 
             if (numFragsReplicated > 0) {
@@ -273,13 +243,13 @@ public class ReplicationWorker implements Runnable {
                 return false;
             }
 
-            fragments = getUnderreplicatedFragments(lh);
-            if (fragments.size() == 0) {
-                LOG.info("Ledger replicated successfully. ledger id is: "
-                        + ledgerIdToReplicate);
+            Set<LedgerFragment> fragmentsAfterReplicate = getUnderreplicatedFragments(lh);
+            if (fragmentsAfterReplicate.size() == 0) {
+                LOG.info("Ledger {} is replicated successfully.", ledgerIdToReplicate);
                 underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                 return true;
             } else {
+                LOG.info("Fail to replicate ledger {}.", ledgerIdToReplicate);
                 // Releasing the underReplication ledger lock and compete
                 // for the replication again for the pending fragments
                 underreplicationManager
@@ -289,30 +259,34 @@ public class ReplicationWorker implements Runnable {
         } catch (BKNoSuchLedgerExistsException e) {
             // Ledger might have been deleted by user
             LOG.info("BKNoSuchLedgerExistsException while opening "
-                    + "ledger for replication. Other clients "
+                    + "ledger {} for replication. Other clients "
                     + "might have deleted the ledger. "
-                    + "So, no harm to continue");
+                    + "So, no harm to continue", ledgerIdToReplicate);
             underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+            getExceptionCounter("BKNoSuchLedgerExistsException").inc();
             return false;
-        } catch (BKReadException e) {
-            LOG.info("BKReadException while"
-                    + " opening ledger for replication."
-                    + " Enough Bookies might not have available"
-                    + "So, no harm to continue");
-            underreplicationManager
-                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
-            return false;
-        } catch (BKBookieHandleNotAvailableException e) {
-            LOG.info("BKBookieHandleNotAvailableException while"
-                    + " opening ledger for replication."
-                    + " Enough Bookies might not have available"
-                    + "So, no harm to continue");
-            underreplicationManager
-                    .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+        } catch (BKNotEnoughBookiesException e) {
+            logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
+            throw e;
+        } catch (BKException e) {
+            logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
             return false;
         }
     }
 
+    private void logBKExceptionAndReleaseLedger(BKException e, long ledgerIdToReplicate)
+            throws UnavailableException {
+        LOG.info("{} while"
+                + " rereplicating ledger {}."
+                + " Enough Bookies might not have available"
+                + " So, no harm to continue",
+            e.getClass().getSimpleName(),
+            ledgerIdToReplicate);
+        underreplicationManager
+                .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+        getExceptionCounter(e.getClass().getSimpleName()).inc();
+    }
+
     /**
      * When checking the fragments of a ledger, there is a corner case
      * where if the last segment/ensemble is open, but nothing has been written to
@@ -471,21 +445,6 @@ public class ReplicationWorker implements Runnable {
         return workerRunning && workerThread.isAlive();
     }
 
-    boolean isInReadOnlyMode() {
-        return isInReadOnlyMode;
-    }
-
-    private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
-            LedgerFragment ledgerFragment) {
-        List<BookieSocketAddress> ensemble = ledgerFragment.getEnsemble();
-        for (BookieSocketAddress bkAddr : ensemble) {
-            if (targetBookie.equals(bkAddr)) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     /** Ledger checker call back */
     private static class CheckerCallback implements
             GenericCallback<Set<LedgerFragment>> {
@@ -508,4 +467,13 @@ public class ReplicationWorker implements Runnable {
         }
     }
 
+    private Counter getExceptionCounter(String name) {
+        Counter counter = this.exceptionCounters.get(name);
+        if (counter == null) {
+            counter = this.statsLogger.scope(REPLICATE_EXCEPTION).getCounter(name);
+            this.exceptionCounters.put(name, counter);
+        }
+        return counter;
+    }
+
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 183af5b..f415152 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -20,6 +20,14 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -37,17 +45,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.SettableFuture;
-
-import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.junit.Assert.*;
 
 /**
@@ -518,7 +515,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
 
             try {
                 bkadmin.replicateLedgerFragment(lh3,
-                        checkercb.getResult(10, TimeUnit.SECONDS).iterator().next(), newBookie);
+                        checkercb.getResult(10, TimeUnit.SECONDS).iterator().next());
                 fail("Shouldn't be able to replicate with a closed client");
             } catch (BKException.BKClientClosedException cce) {
                 // correct behaviour
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index fc78d8f..a9ccf89 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -322,13 +322,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         // Call the async recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
           initialPort);
-        BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
-          newBookiePort);
-        LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
-          + bookieDest + ")");
         // Initiate the sync object
         sync.value = false;
-        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieRecoverCb, sync);
 
         // Wait for the async method to complete.
         synchronized (sync) {
@@ -380,12 +376,11 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         // Call the async recover bookie method.
         BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
           initialPort);
-        BookieSocketAddress bookieDest = null;
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
           + ") and replicate it to a random available one");
         // Initiate the sync object
         sync.value = false;
-        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+        bkAdmin.asyncRecoverBookieData(bookieSrc, bookieRecoverCb, sync);
 
         // Wait for the async method to complete.
         synchronized (sync) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 83ec593..3bcac7d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -91,8 +91,13 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
             LOG.info("unreplicated fragment: {}", r);
         }
         assertEquals("Should have one missing fragment", 1, result.size());
-        assertEquals("Fragment should be missing from first replica", result
-                .iterator().next().getAddress(), replicaToKill);
+        assertEquals("There should be 1 fragments. But returned fragments are "
+            + result, 1, result.size());
+        LedgerFragment lf = result.iterator().next();
+        assertEquals("There should be 1 failed bookies in first fragment " + lf,
+            1, lf.getBookiesIndexes().size());
+        assertEquals("Fragment should be missing from first replica",
+            lf.getAddress(0), replicaToKill);
 
         BookieSocketAddress replicaToKill2 = lh.getLedgerMetadata()
                 .getEnsembles().get(0L).get(1);
@@ -104,7 +109,16 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         for (LedgerFragment r : result) {
             LOG.info("unreplicated fragment: {}", r);
         }
-        assertEquals("Should have three missing fragments", 3, result.size());
+        assertEquals("Should have two missing fragments", 2, result.size());
+        for (LedgerFragment fragment : result) {
+            if (fragment.getFirstEntryId() == 0L) {
+                assertEquals("There should be 2 failed bookies in first fragment",
+                    2, fragment.getBookiesIndexes().size());
+            } else {
+                assertEquals("There should be 1 failed bookies in second fragment",
+                    1, fragment.getBookiesIndexes().size());
+            }
+        }
     }
 
     /**
@@ -191,7 +205,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
             LOG.info("unreplicated fragment: {}", r);
         }
 
-        assertEquals("There should be 2 fragments", 2, result.size());
+        assertEquals("There should be 1 fragments", 1, result.size());
+        assertEquals("There should be 2 failed bookies in the fragment",
+                2, result.iterator().next().getBookiesIndexes().size());
     }
 
     /**
@@ -264,7 +280,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
             LOG.info("unreplicated fragment: {}", r);
         }
 
-        assertEquals("There should be 3 fragments", 3, result.size());
+        assertEquals("There should be 1 fragments", 1, result.size());
+        assertEquals("There should be 3 failed bookies in the fragment",
+                3, result.iterator().next().getBookiesIndexes().size());
     }
 
     /**
@@ -297,7 +315,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         }
         Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
         assertNotNull("Result shouldn't be null", result);
-        assertEquals("There should be 2 fragments.", 2, result.size());
+        assertEquals("There should be 1 fragments.", 1, result.size());
+        assertEquals("There should be 2 failed bookies in the fragment",
+                2, result.iterator().next().getBookiesIndexes().size());
     }
 
     /**
@@ -326,6 +346,8 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         assertNotNull("Result shouldn't be null", result);
         assertEquals("There should be 1 fragment. But returned fragments are "
                 + result, 1, result.size());
+        assertEquals("There should be 1 failed bookies in the fragment",
+                1, result.iterator().next().getBookiesIndexes().size());
     }
 
     /**
@@ -364,8 +386,17 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
 
         Set<LedgerFragment> result = getUnderReplicatedFragments(lh1);
         assertNotNull("Result shouldn't be null", result);
-        assertEquals("There should be 3 fragment. But returned fragments are "
-                + result, 3, result.size());
+        assertEquals("There should be 2 fragments. But returned fragments are "
+                + result, 2, result.size());
+        for (LedgerFragment lf : result) {
+            if (lf.getFirstEntryId() == 0L) {
+                assertEquals("There should be 2 failed bookies in first fragment",
+                        2, lf.getBookiesIndexes().size());
+            } else {
+                assertEquals("There should be 1 failed bookie in second fragment",
+                        1, lf.getBookiesIndexes().size());
+            }
+        }
     }
 
     /**
@@ -439,6 +470,8 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
         assertNotNull("Result shouldn't be null", result);
         assertEquals("There should be 1 fragment. But returned fragments are "
                 + result, 1, result.size());
+        assertEquals("There should be 1 failed bookies in the fragment",
+                1, result.iterator().next().getBookiesIndexes().size());
         lh1.close();
 
         // kill bookie 0
@@ -454,8 +487,10 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
 
         result = getUnderReplicatedFragments(lh1);
         assertNotNull("Result shouldn't be null", result);
-        assertEquals("There should be 2 fragment. But returned fragments are "
-                + result, 2, result.size());
+        assertEquals("There should be 1 fragment. But returned fragments are "
+                + result, 1, result.size());
+        assertEquals("There should be 2 failed bookies in the fragment",
+                2, result.iterator().next().getBookiesIndexes().size());
         lh1.close();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index c0ff8d7..e2d85e0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -19,6 +19,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.collect.Sets;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Enumeration;
@@ -26,7 +27,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.CountDownLatch;
-
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -104,7 +104,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         // 0-9 entries should be copy to new bookie
 
         for (LedgerFragment lf : result) {
-            admin.replicateLedgerFragment(lh, lf, newBkAddr);
+            admin.replicateLedgerFragment(lh, lf);
         }
 
         // Killing all bookies except newly replicated bookie
@@ -170,11 +170,11 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         int unclosedCount = 0;
         for (LedgerFragment lf : result) {
             if (lf.isClosed()) {
-                admin.replicateLedgerFragment(lh, lf, newBkAddr);
+                admin.replicateLedgerFragment(lh, lf);
             } else {
                 unclosedCount++;
                 try {
-                    admin.replicateLedgerFragment(lh, lf, newBkAddr);
+                    admin.replicateLedgerFragment(lh, lf);
                     fail("Shouldn't be able to rereplicate unclosed ledger");
                 } catch (BKException bke) {
                     // correct behaviour
@@ -216,12 +216,9 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
 
         Set<LedgerFragment> fragments = getFragmentsToReplicate(lh);
         BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
-        int startNewBookie = startNewBookie();
-        BookieSocketAddress additionalBK = new BookieSocketAddress(InetAddress
-                .getLocalHost().getHostAddress(), startNewBookie);
         for (LedgerFragment lf : fragments) {
             try {
-                admin.replicateLedgerFragment(lh, lf, additionalBK);
+                admin.replicateLedgerFragment(lh, lf);
             } catch (BKException.BKLedgerRecoveryException e) {
                 // expected
             }
@@ -265,7 +262,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
             final long oriFragmentLastEntry, long entriesPerSubFragment,
             long expectedSubFragments, LedgerHandle lh) {
         LedgerFragment fr = new LedgerFragment(lh, oriFragmentFirstEntry,
-                oriFragmentLastEntry, 0) {
+                oriFragmentLastEntry, Sets.newHashSet(0)) {
             @Override
             public long getLastStoredEntryId() {
                 return oriFragmentLastEntry;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index d273dab..2ce591b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -79,6 +79,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         baseConf.setLedgerManagerFactoryClassName(
                 "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
         baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+        baseConf.setRwRereplicateBackoffMs(500);
         baseClientConf.setLedgerManagerFactoryClassName(
                 "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
         this.digestType = DigestType.MAC;
@@ -550,8 +551,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
 
     }
 
-    private int getReplicaIndexInLedger(LedgerHandle lh,
- BookieSocketAddress replicaToKill) {
+    private int getReplicaIndexInLedger(LedgerHandle lh, BookieSocketAddress replicaToKill) {
         SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = LedgerHandleAdapter
                 .getLedgerMetadata(lh).getEnsembles();
         int ledgerReplicaIndex = -1;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0ec3bb0..e873f15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.replication;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNull;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -39,7 +38,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -132,7 +130,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
 
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         rw.start();
         try {
@@ -179,7 +177,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         LOG.info("New Bookie addr :" + newBkAddr);
 
         killAllBookies(lh, newBkAddr);
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         rw.start();
         try {
@@ -231,7 +229,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         BookieSocketAddress newBkAddr1 = new BookieSocketAddress(InetAddress
                 .getLocalHost().getHostAddress(), startNewBookie1);
         LOG.info("New Bookie addr :" + newBkAddr1);
-        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf, newBkAddr1);
+        ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);
 
         // Starte RW2
         int startNewBookie2 = startNewBookie();
@@ -242,8 +240,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(10000)
                 .build();
-        ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf,
-                newBkAddr2);
+        ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf);
         rw1.start();
         rw2.start();
 
@@ -296,7 +293,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
         rw.start();
 
         try {
@@ -354,7 +351,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
 
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         rw.start();
         try {
@@ -413,7 +410,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
 
         // set to 3s instead of default 30s
         baseConf.setOpenLedgerRereplicationGracePeriod("3000");
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         LedgerManagerFactory mFactory = LedgerManagerFactory
                 .newLedgerManagerFactory(baseClientConf, zkc);
@@ -474,7 +471,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .getLocalHost().getHostAddress(), startNewBookie);
         LOG.info("New Bookie addr :" + newBkAddr);
 
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
 
         LedgerManagerFactory mFactory = LedgerManagerFactory
                 .newLedgerManagerFactory(baseClientConf, zkc);
@@ -509,49 +506,6 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
     }
 
     /**
-     * Test that if the local bookie turns out to be read-only, then the replicator will pause but not shutdown.
-     */
-    @Test
-    public void testRWOnLocalBookieReadonlyTransition() throws Exception {
-        LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
-
-        for (int i = 0; i < 10; i++) {
-            lh.addEntry(data);
-        }
-        BookieSocketAddress replicaToKill =
-                LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
-
-        LOG.info("Killing Bookie", replicaToKill);
-        killBookie(replicaToKill);
-
-        int newBkPort = startNewBookie();
-        for (int i = 0; i < 10; i++) {
-            lh.addEntry(data);
-        }
-
-        BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBkPort);
-        LOG.info("New Bookie addr :" + newBkAddr);
-
-        ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
-
-        rw.start();
-        try {
-            BookieServer newBk = bs.get(bs.size() - 1);
-            bsConfs.get(bsConfs.size() - 1).setReadOnlyModeEnabled(true);
-            newBk.getBookie().doTransitionToReadOnlyMode();
-            underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
-            while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath) && rw.isRunning()
-                    && !rw.isInReadOnlyMode()) {
-                Thread.sleep(100);
-            }
-            assertNull(zkc.exists(String.format("%s/urL%010d", baseLockPath, lh.getId()), false));
-            assertTrue("RW should continue even if the bookie is readonly", rw.isRunning());
-        } finally {
-            rw.shutdown();
-        }
-    }
-
-    /**
      * Test that the replication worker will not shutdown on a simple ZK disconnection
      */
     @Test
@@ -562,7 +516,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .build();
 
         try {
-            ReplicationWorker rw = new ReplicationWorker(zk, baseConf, getBookie(0));
+            ReplicationWorker rw = new ReplicationWorker(zk, baseConf);
             rw.start();
             for (int i = 0; i < 10; i++) {
                 if (rw.isRunning()) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].