You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/10/02 18:10:22 UTC

[bookkeeper] branch master updated: Enhance deferLedgerLockReleaseOfFailedLedger in ReplicationWorker

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f996dc  Enhance deferLedgerLockReleaseOfFailedLedger in ReplicationWorker
2f996dc is described below

commit 2f996dcf0159f945f7ec97ce7402e5d293009444
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Wed Oct 2 11:10:16 2019 -0700

    Enhance deferLedgerLockReleaseOfFailedLedger in ReplicationWorker
    
    Descriptions of the changes in this PR:
    
    **Issue:** In the past, ReplicationWorker (RW) retrial logic is enhanced to backoff
    replication after threshold number of replication failures of a ledger. This is
    to help in a pathological situation where data (ledger/entry) is unavailable.
    But this is sub-optimal solution, since there is possibility that each RW can
    try recovering a ledger threshold number of times, before a RW defers
    ledgerLockRelease. Also each time a RW tries to recover it would read entry/fragment
    sequentially and writes to new bookies until it finds a missing entry (completely
    unavailable) before failing on replication of that ledger. This is done for
    each retrial and it bloats the storage and overreplication need to detect and
    delete it, which runs once a day by default. So because of this cluster can
    run out of storage space and may become RO cluster. Also this puts quite a bit of
    load on cluster in vain.
    
    **So the new proposal is to**
    - On each RW. remember the state in addition to the counter. State must include the entries which RW failed to read.
    - Counter and state must kept around in each RW node. And exponential backup should be used for deferLedgerLockReleaseOfFailedLedger
    - During next attempt by RW, try to read the failed entries which is noted in the state. Read must be successful before proceeding replicating.
    - With this model we avoid duplicate copies on each attempt. At the most each RW will create only one copy
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #2166 from reddycharan/enhancereplication
---
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  21 +-
 .../apache/bookkeeper/client/LedgerFragment.java   |   4 +-
 .../client/LedgerFragmentReplicator.java           |  24 ++-
 .../bookkeeper/conf/ServerConfiguration.java       |  27 +--
 .../bookkeeper/replication/ReplicationStats.java   |   2 +-
 .../bookkeeper/replication/ReplicationWorker.java  | 130 ++++++++++--
 .../bookkeeper/client/BookKeeperCloseTest.java     |   4 +-
 .../client/TestLedgerFragmentReplication.java      |  10 +-
 .../replication/TestReplicationWorker.java         | 225 ++++++++++++++++++---
 site/_data/config/bk_server.yaml                   |   8 +-
 10 files changed, 365 insertions(+), 90 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index bdf56b8..6bd95d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -95,6 +96,7 @@ public class BookKeeperAdmin implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
     private static final Logger VERBOSE = LoggerFactory.getLogger("verbose");
+    private static final BiConsumer<Long, Long> NOOP_BICONSUMER = (l, e) -> { };
 
     // BookKeeper client instance
     private BookKeeper bkc;
@@ -909,7 +911,7 @@ public class BookKeeperAdmin implements AutoCloseable {
                             LedgerFragment ledgerFragment = new LedgerFragment(lh,
                                 startEntryId, endEntryId, targetBookieAddresses.keySet());
                             asyncRecoverLedgerFragment(lh, ledgerFragment, cb,
-                                Sets.newHashSet(targetBookieAddresses.values()));
+                                Sets.newHashSet(targetBookieAddresses.values()), NOOP_BICONSUMER);
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                             return;
@@ -961,8 +963,9 @@ public class BookKeeperAdmin implements AutoCloseable {
     private void asyncRecoverLedgerFragment(final LedgerHandle lh,
             final LedgerFragment ledgerFragment,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
-        lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
+            final Set<BookieSocketAddress> newBookies,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
+        lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies, onReadEntryFailureCallback);
     }
 
     private Map<Integer, BookieSocketAddress> getReplacementBookies(
@@ -1050,18 +1053,20 @@ public class BookKeeperAdmin implements AutoCloseable {
      *            - LedgerFragment to replicate
      */
     public void replicateLedgerFragment(LedgerHandle lh,
-            final LedgerFragment ledgerFragment)
+            final LedgerFragment ledgerFragment,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback)
             throws InterruptedException, BKException {
         Optional<Set<BookieSocketAddress>> excludedBookies = Optional.empty();
         Map<Integer, BookieSocketAddress> targetBookieAddresses =
                 getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
                         ledgerFragment.getBookiesIndexes(), excludedBookies);
-        replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
+        replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses, onReadEntryFailureCallback);
     }
 
     private void replicateLedgerFragment(LedgerHandle lh,
             final LedgerFragment ledgerFragment,
-            final Map<Integer, BookieSocketAddress> targetBookieAddresses)
+            final Map<Integer, BookieSocketAddress> targetBookieAddresses,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback)
             throws InterruptedException, BKException {
         CompletableFuture<Void> result = new CompletableFuture<>();
         ResultCallBack resultCallBack = new ResultCallBack(result);
@@ -1074,7 +1079,7 @@ public class BookKeeperAdmin implements AutoCloseable {
 
         Set<BookieSocketAddress> targetBookieSet = Sets.newHashSet();
         targetBookieSet.addAll(targetBookieAddresses.values());
-        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet);
+        asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet, onReadEntryFailureCallback);
 
         try {
             SyncCallbackUtils.waitForResult(result);
@@ -1132,7 +1137,7 @@ public class BookKeeperAdmin implements AutoCloseable {
     /**
      * This is the class for getting the replication result.
      */
-    static class ResultCallBack implements AsyncCallback.VoidCallback {
+    public static class ResultCallBack implements AsyncCallback.VoidCallback {
         private final CompletableFuture<Void> sync;
 
         public ResultCallBack(CompletableFuture<Void> sync) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index 382fe4e..94e7454 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -95,11 +95,11 @@ public class LedgerFragment {
         return ledgerId;
     }
 
-    long getFirstEntryId() {
+    public long getFirstEntryId() {
         return firstEntryId;
     }
 
-    long getLastKnownEntryId() {
+    public long getLastKnownEntryId() {
         return lastKnownEntryId;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 765dec0..1da881d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -109,7 +110,8 @@ public class LedgerFragmentReplicator {
     private void replicateFragmentInternal(final LedgerHandle lh,
             final LedgerFragment lf,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+            final Set<BookieSocketAddress> newBookies,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
         if (!lf.isClosed()) {
             LOG.error("Trying to replicate an unclosed fragment;"
                       + " This is not safe {}", lf);
@@ -156,7 +158,7 @@ public class LedgerFragmentReplicator {
                 BKException.Code.LedgerRecoveryException);
         for (final Long entryId : entriesToReplicate) {
             recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
-                    newBookies);
+                    newBookies, onReadEntryFailureCallback);
         }
     }
 
@@ -182,14 +184,15 @@ public class LedgerFragmentReplicator {
      */
     void replicate(final LedgerHandle lh, final LedgerFragment lf,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final Set<BookieSocketAddress> targetBookieAddresses)
+            final Set<BookieSocketAddress> targetBookieAddresses,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback)
             throws InterruptedException {
         Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
                 bkc.getConf().getRereplicationEntryBatchSize());
         LOG.info("Replicating fragment {} in {} sub fragments.",
                 lf, partionedFragments.size());
         replicateNextBatch(lh, partionedFragments.iterator(),
-                ledgerFragmentMcb, targetBookieAddresses);
+                ledgerFragmentMcb, targetBookieAddresses, onReadEntryFailureCallback);
     }
 
     /**
@@ -198,7 +201,8 @@ public class LedgerFragmentReplicator {
     private void replicateNextBatch(final LedgerHandle lh,
             final Iterator<LedgerFragment> fragments,
             final AsyncCallback.VoidCallback ledgerFragmentMcb,
-            final Set<BookieSocketAddress> targetBookieAddresses) {
+            final Set<BookieSocketAddress> targetBookieAddresses,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) {
         if (fragments.hasNext()) {
             try {
                 replicateFragmentInternal(lh, fragments.next(),
@@ -211,11 +215,12 @@ public class LedgerFragmentReplicator {
                                 } else {
                                     replicateNextBatch(lh, fragments,
                                             ledgerFragmentMcb,
-                                            targetBookieAddresses);
+                                            targetBookieAddresses,
+                                            onReadEntryFailureCallback);
                                 }
                             }
 
-                        }, targetBookieAddresses);
+                        }, targetBookieAddresses, onReadEntryFailureCallback);
             } catch (InterruptedException e) {
                 ledgerFragmentMcb.processResult(
                         BKException.Code.InterruptedException, null, null);
@@ -289,7 +294,9 @@ public class LedgerFragmentReplicator {
     private void recoverLedgerFragmentEntry(final Long entryId,
             final LedgerHandle lh,
             final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
-            final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+            final Set<BookieSocketAddress> newBookies,
+            final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException {
+        final long ledgerId = lh.getId();
         final AtomicInteger numCompleted = new AtomicInteger(0);
         final AtomicBoolean completed = new AtomicBoolean(false);
         final WriteCallback multiWriteCallback = new WriteCallback() {
@@ -328,6 +335,7 @@ public class LedgerFragmentReplicator {
                 if (rc != BKException.Code.OK) {
                     LOG.error("BK error reading ledger entry: " + entryId,
                             BKException.create(rc));
+                    onReadEntryFailureCallback.accept(ledgerId, entryId);
                     ledgerFragmentEntryMcb.processResult(rc, null, null);
                     return;
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 9a2c42d..cba673b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -1552,12 +1552,15 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
-     * Set the grace period so that if the replication worker fails to replicate
-     * a underreplicatedledger for more than
-     * ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
-     * number of times, then instead of releasing the lock immediately after
-     * failed attempt, it will hold under replicated ledger lock for this grace
-     * period and then it will release the lock.
+     * Set the grace period, in milliseconds, which the replication worker has
+     * to wait before releasing the lock after it failed to replicate a ledger.
+     * For the first ReplicationWorker.NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS
+     * failures it will do exponential backoff then it will bound at
+     * LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD.
+     *
+     * <p>On replication failure, instead of releasing the lock immediately
+     * after failed attempt, it will hold under replicated ledger lock for the
+     * grace period and then it will release the lock.
      *
      * @param waitTime
      */
@@ -1566,16 +1569,16 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
-     * Get the grace period which the replication worker to wait before
-     * releasing the lock after replication worker failing to replicate for more
-     * than
-     * ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
-     * number of times.
+     * Get the grace period, in milliseconds, which the replication worker has
+     * to wait before releasing the lock after it failed to replicate a ledger.
+     * For the first ReplicationWorker.NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS
+     * failures it will do exponential backoff then it will bound at
+     * LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD.
      *
      * @return
      */
     public long getLockReleaseOfFailedLedgerGracePeriod() {
-        return getLong(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, 60000);
+        return getLong(LOCK_RELEASE_OF_FAILED_LEDGER_GRACE_PERIOD, 300000);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index f553f91..6ec9f49 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -60,7 +60,7 @@ public interface ReplicationStats {
     String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
     String REPLICATE_EXCEPTION = "exceptions";
     String NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER = "NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER";
-
+    String NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION = "NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION";
     String BK_CLIENT_SCOPE = "bk_client";
 
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 1bba2d5..80cfed3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -20,6 +20,7 @@
 package org.apache.bookkeeper.replication;
 
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
 import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
 import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
@@ -35,13 +36,16 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 
 import org.apache.bookkeeper.bookie.BookieThread;
 import org.apache.bookkeeper.client.BKException;
@@ -81,8 +85,8 @@ import org.slf4j.LoggerFactory;
 public class ReplicationWorker implements Runnable {
     private static final Logger LOG = LoggerFactory
             .getLogger(ReplicationWorker.class);
-    private static final int REPLICATED_FAILED_LEDGERS_MAXSIZE = 100;
-    static final int MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING = 10;
+    private static final int REPLICATED_FAILED_LEDGERS_MAXSIZE = 2000;
+    public static final int NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS = 5;
 
     private final LedgerUnderreplicationManager underreplicationManager;
     private final ServerConfiguration conf;
@@ -96,6 +100,8 @@ public class ReplicationWorker implements Runnable {
     private final long openLedgerRereplicationGracePeriod;
     private final Timer pendingReplicationTimer;
     private final long lockReleaseOfFailedLedgerGracePeriod;
+    private final long baseBackoffForLockReleaseOfFailedLedger;
+    private final BiConsumer<Long, Long> onReadEntryFailureCallback;
 
     // Expose Stats
     private final StatsLogger statsLogger;
@@ -119,8 +125,14 @@ public class ReplicationWorker implements Runnable {
         help = "the number of defer-ledger-lock-releases of failed ledgers"
     )
     private final Counter numDeferLedgerLockReleaseOfFailedLedger;
+    @StatsDoc(
+            name = NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION,
+            help = "the number of entries ReplicationWorker unable to read"
+        )
+    private final Counter numEntriesUnableToReadForReplication;
     private final Map<String, Counter> exceptionCounters;
     final LoadingCache<Long, AtomicInteger> replicationFailedLedgers;
+    final LoadingCache<Long, ConcurrentSkipListSet<Long>> unableToReadEntriesForReplication;
 
     /**
      * Replication worker for replicating the ledger fragments from
@@ -175,6 +187,8 @@ public class ReplicationWorker implements Runnable {
         this.openLedgerRereplicationGracePeriod = conf
                 .getOpenLedgerRereplicationGracePeriod();
         this.lockReleaseOfFailedLedgerGracePeriod = conf.getLockReleaseOfFailedLedgerGracePeriod();
+        this.baseBackoffForLockReleaseOfFailedLedger = this.lockReleaseOfFailedLedgerGracePeriod
+                / (long) (Math.pow(2, NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS));
         this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
         this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
         this.replicationFailedLedgers = CacheBuilder.newBuilder().maximumSize(REPLICATED_FAILED_LEDGERS_MAXSIZE)
@@ -184,6 +198,14 @@ public class ReplicationWorker implements Runnable {
                         return new AtomicInteger();
                     }
                 });
+        this.unableToReadEntriesForReplication = CacheBuilder.newBuilder()
+                .maximumSize(REPLICATED_FAILED_LEDGERS_MAXSIZE)
+                .build(new CacheLoader<Long, ConcurrentSkipListSet<Long>>() {
+                    @Override
+                    public ConcurrentSkipListSet<Long> load(Long key) throws Exception {
+                        return new ConcurrentSkipListSet<Long>();
+                    }
+                });
 
         // Expose Stats
         this.statsLogger = statsLogger;
@@ -192,7 +214,13 @@ public class ReplicationWorker implements Runnable {
         this.numLedgersReplicated = this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
         this.numDeferLedgerLockReleaseOfFailedLedger = this.statsLogger
                 .getCounter(NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER);
+        this.numEntriesUnableToReadForReplication = this.statsLogger
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION);
         this.exceptionCounters = new HashMap<String, Counter>();
+        this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
+            numEntriesUnableToReadForReplication.inc();
+            unableToReadEntriesForReplication.getUnchecked(ledgerid).add(entryid);
+        };
     }
 
     /**
@@ -270,6 +298,61 @@ public class ReplicationWorker implements Runnable {
         getExceptionCounter(e.getClass().getSimpleName()).inc();
     }
 
+    private boolean tryReadingFaultyEntries(LedgerHandle lh, LedgerFragment ledgerFragment) {
+        long ledgerId = lh.getId();
+        ConcurrentSkipListSet<Long> entriesUnableToReadForThisLedger = unableToReadEntriesForReplication
+                .getIfPresent(ledgerId);
+        if (entriesUnableToReadForThisLedger == null) {
+            return true;
+        }
+        long firstEntryIdOfFragment = ledgerFragment.getFirstEntryId();
+        long lastEntryIdOfFragment = ledgerFragment.getLastKnownEntryId();
+        NavigableSet<Long> entriesOfThisFragmentUnableToRead = entriesUnableToReadForThisLedger
+                .subSet(firstEntryIdOfFragment, true, lastEntryIdOfFragment, true);
+        if (entriesOfThisFragmentUnableToRead.isEmpty()) {
+            return true;
+        }
+        final CountDownLatch multiReadComplete = new CountDownLatch(1);
+        final AtomicInteger numOfResponsesToWaitFor = new AtomicInteger(entriesOfThisFragmentUnableToRead.size());
+        final AtomicInteger returnRCValue = new AtomicInteger(BKException.Code.OK);
+        for (long entryIdToRead : entriesOfThisFragmentUnableToRead) {
+            if (multiReadComplete.getCount() == 0) {
+                /*
+                 * if an asyncRead request had already failed then break the
+                 * loop.
+                 */
+                break;
+            }
+            lh.asyncReadEntries(entryIdToRead, entryIdToRead, (rc, ledHan, seq, ctx) -> {
+                long thisEntryId = (Long) ctx;
+                if (rc == BKException.Code.OK) {
+                    entriesUnableToReadForThisLedger.remove(thisEntryId);
+                    if (numOfResponsesToWaitFor.decrementAndGet() == 0) {
+                        multiReadComplete.countDown();
+                    }
+                } else {
+                    LOG.error("Received error: {} while trying to read entry: {} of ledger: {} in ReplicationWorker",
+                            rc, entryIdToRead, ledgerId);
+                    returnRCValue.compareAndSet(BKException.Code.OK, rc);
+                    /*
+                     * on receiving a failure error response, multiRead can be
+                     * marked completed, since there is not need to wait for
+                     * other responses.
+                     */
+                    multiReadComplete.countDown();
+                }
+            }, entryIdToRead);
+        }
+        try {
+            multiReadComplete.await();
+        } catch (InterruptedException e) {
+            LOG.error("Got interrupted exception while trying to read entries", e);
+            Thread.currentThread().interrupt();  // set interrupt flag
+            return false;
+        }
+        return (returnRCValue.get() == BKException.Code.OK);
+    }
+
     private boolean rereplicate(long ledgerIdToReplicate) throws InterruptedException, BKException,
             UnavailableException {
         if (LOG.isDebugEnabled()) {
@@ -292,8 +375,13 @@ public class ReplicationWorker implements Runnable {
                     foundOpenFragments = true;
                     continue;
                 }
+                if (!tryReadingFaultyEntries(lh, ledgerFragment)) {
+                    LOG.error("Failed to read faulty entries, so giving up replicating ledgerFragment {}",
+                            ledgerFragment);
+                    continue;
+                }
                 try {
-                    admin.replicateLedgerFragment(lh, ledgerFragment);
+                    admin.replicateLedgerFragment(lh, ledgerFragment, onReadEntryFailureCallback);
                 } catch (BKException.BKBookieHandleNotAvailableException e) {
                     LOG.warn("BKBookieHandleNotAvailableException while replicating the fragment", e);
                 } catch (BKException.BKLedgerRecoveryException e) {
@@ -302,7 +390,6 @@ public class ReplicationWorker implements Runnable {
                     LOG.warn("BKNotEnoughBookiesException while replicating the fragment", e);
                 }
             }
-
             if (foundOpenFragments || isLastSegmentOpenAndMissingBookies(lh)) {
                 deferLedgerLockRelease = true;
                 deferLedgerLockRelease(ledgerIdToReplicate);
@@ -315,16 +402,9 @@ public class ReplicationWorker implements Runnable {
                 underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
                 return true;
             } else {
-                if (replicationFailedLedgers.getUnchecked(ledgerIdToReplicate)
-                        .incrementAndGet() == MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING) {
-                    deferLedgerLockRelease = true;
-                    LOG.error(
-                            "ReplicationWorker failed to replicate Ledger : {} for {} number of times, "
-                            + "so deferring the ledger lock release",
-                            ledgerIdToReplicate, MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
-                    deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
-                    numDeferLedgerLockReleaseOfFailedLedger.inc();
-                }
+                deferLedgerLockRelease = true;
+                deferLedgerLockReleaseOfFailedLedger(ledgerIdToReplicate);
+                numDeferLedgerLockReleaseOfFailedLedger.inc();
                 // Releasing the underReplication ledger lock and compete
                 // for the replication again for the pending fragments
                 return false;
@@ -414,6 +494,10 @@ public class ReplicationWorker implements Runnable {
         return fragments;
     }
 
+    void scheduleTaskWithDelay(TimerTask timerTask, long delayPeriod) {
+        pendingReplicationTimer.schedule(timerTask, delayPeriod);
+    }
+
     /**
      * Schedules a timer task for releasing the lock which will be scheduled
      * after open ledger fragment replication time. Ledger will be fenced if it
@@ -489,18 +573,30 @@ public class ReplicationWorker implements Runnable {
                 }
             }
         };
-        pendingReplicationTimer.schedule(timerTask, gracePeriod);
+        scheduleTaskWithDelay(timerTask, gracePeriod);
     }
 
     /**
      * Schedules a timer task for releasing the lock.
      */
     private void deferLedgerLockReleaseOfFailedLedger(final long ledgerId) {
+        int numOfTimesFailedSoFar = replicationFailedLedgers.getUnchecked(ledgerId).getAndIncrement();
+        /*
+         * for the first NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS retrials do
+         * exponential backoff, starting from
+         * baseBackoffForLockReleaseOfFailedLedger
+         */
+        long delayOfLedgerLockReleaseInMSecs = (numOfTimesFailedSoFar >= NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS)
+                ? this.lockReleaseOfFailedLedgerGracePeriod
+                : this.baseBackoffForLockReleaseOfFailedLedger * (int) Math.pow(2, numOfTimesFailedSoFar);
+        LOG.error(
+                "ReplicationWorker failed to replicate Ledger : {} for {} number of times, "
+                + "so deferring the ledger lock release by {} msecs",
+                ledgerId, numOfTimesFailedSoFar, delayOfLedgerLockReleaseInMSecs);
         TimerTask timerTask = new TimerTask() {
             @Override
             public void run() {
                 try {
-                    replicationFailedLedgers.invalidate(ledgerId);
                     underreplicationManager.releaseUnderreplicatedLedger(ledgerId);
                 } catch (UnavailableException e) {
                     LOG.error("UnavailableException while replicating fragments of ledger {}", ledgerId, e);
@@ -508,7 +604,7 @@ public class ReplicationWorker implements Runnable {
                 }
             }
         };
-        pendingReplicationTimer.schedule(timerTask, lockReleaseOfFailedLedgerGracePeriod);
+        scheduleTaskWithDelay(timerTask, delayOfLedgerLockReleaseInMSecs);
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index eda68c8..3bebc21 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -65,6 +66,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
             .getLogger(BookKeeperCloseTest.class);
     private DigestType digestType = DigestType.CRC32;
     private static final String PASSWORD = "testPasswd";
+    private static final BiConsumer<Long, Long> NOOP_BICONSUMER = (l, e) -> { };
 
     public BookKeeperCloseTest() {
         super(3);
@@ -519,7 +521,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
 
             try {
                 bkadmin.replicateLedgerFragment(lh3,
-                        checkercb.getResult(10, TimeUnit.SECONDS).iterator().next());
+                        checkercb.getResult(10, TimeUnit.SECONDS).iterator().next(), NOOP_BICONSUMER);
                 fail("Shouldn't be able to replicate with a closed client");
             } catch (BKException.BKClientClosedException cce) {
                 // correct behaviour
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index 33b966b..3105d18 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
@@ -53,6 +54,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
 
     private static final byte[] TEST_PSSWD = "testpasswd".getBytes();
     private static final DigestType TEST_DIGEST_TYPE = BookKeeper.DigestType.CRC32;
+    private static final BiConsumer<Long, Long> NOOP_BICONSUMER = (l, e) -> { };
     private static final Logger LOG = LoggerFactory
             .getLogger(TestLedgerFragmentReplication.class);
 
@@ -111,7 +113,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         // 0-9 entries should be copy to new bookie
 
         for (LedgerFragment lf : result) {
-            admin.replicateLedgerFragment(lh, lf);
+            admin.replicateLedgerFragment(lh, lf, NOOP_BICONSUMER);
         }
 
         // Killing all bookies except newly replicated bookie
@@ -174,11 +176,11 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         int unclosedCount = 0;
         for (LedgerFragment lf : result) {
             if (lf.isClosed()) {
-                admin.replicateLedgerFragment(lh, lf);
+                admin.replicateLedgerFragment(lh, lf, NOOP_BICONSUMER);
             } else {
                 unclosedCount++;
                 try {
-                    admin.replicateLedgerFragment(lh, lf);
+                    admin.replicateLedgerFragment(lh, lf, NOOP_BICONSUMER);
                     fail("Shouldn't be able to rereplicate unclosed ledger");
                 } catch (BKException bke) {
                     // correct behaviour
@@ -222,7 +224,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
         BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
         for (LedgerFragment lf : fragments) {
             try {
-                admin.replicateLedgerFragment(lh, lf);
+                admin.replicateLedgerFragment(lh, lf, NOOP_BICONSUMER);
             } catch (BKException.BKLedgerRecoveryException e) {
                 // expected
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index c155bb4..11531b5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -19,16 +19,20 @@
  */
 package org.apache.bookkeeper.replication;
 
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.TimerTask;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.Cleanup;
@@ -47,10 +51,16 @@ import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -438,21 +448,13 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
 
     }
 
-    /**
-     * Tests that ReplicationWorker will not make more than
-     * ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
-     * number of replication failure attempts and if it fails more these many
-     * number of times then it will defer lock release by
-     * lockReleaseOfFailedLedgerGracePeriod.
-     *
-     * @throws Exception
-     */
     @Test
     public void testBookiesNotAvailableScenarioForReplicationWorker() throws Exception {
         int ensembleSize = 3;
         LedgerHandle lh = bkc.createLedger(ensembleSize, ensembleSize, BookKeeper.DigestType.CRC32, TESTPASSWD);
 
-        for (int i = 0; i < 10; i++) {
+        int numOfEntries = 7;
+        for (int i = 0; i < numOfEntries; i++) {
             lh.addEntry(data);
         }
         lh.close();
@@ -474,9 +476,10 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
         }
 
         // create couple of replicationworkers
-        baseConf.setLockReleaseOfFailedLedgerGracePeriod("500");
-        ReplicationWorker rw1 = new ReplicationWorker(baseConf);
-        ReplicationWorker rw2 = new ReplicationWorker(baseConf);
+        ServerConfiguration newRWConf = new ServerConfiguration(baseConf);
+        newRWConf.setLockReleaseOfFailedLedgerGracePeriod("64");
+        ReplicationWorker rw1 = new ReplicationWorker(newRWConf);
+        ReplicationWorker rw2 = new ReplicationWorker(newRWConf);
 
         @Cleanup
         MetadataClientDriver clientDriver = MetadataDrivers
@@ -487,6 +490,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
 
         LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
         try {
+            //mark ledger underreplicated
             for (int i = 0; i < bookiesKilled.length; i++) {
                 underReplicationManager.markLedgerUnderreplicated(lh.getId(), bookiesKilled[i].toString());
             }
@@ -502,10 +506,10 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 @Override
                 public void run() {
                     try {
-                        Thread.sleep(4000);
+                        Thread.sleep(3000);
                         isBookieRestarted.set(true);
                         /*
-                         * after sleeping for 4000 msecs, restart one of the
+                         * after sleeping for 3000 msecs, restart one of the
                          * bookie, so that replication can succeed.
                          */
                         startBookie(killedBookiesConfig[0]);
@@ -515,6 +519,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 }
             })).start();
 
+            int rw1PrevFailedAttemptsCount = 0;
+            int rw2PrevFailedAttemptsCount = 0;
             while (!isBookieRestarted.get()) {
                 /*
                  * since all the bookies containing the ledger entries are down
@@ -522,26 +528,21 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                  */
                 assertTrue("Ledger: " + lh.getId() + " should be underreplicated",
                         ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath));
-                /*
-                 * check for both the replicationworkders number of failed
-                 * attempts should be less than ReplicationWorker.
-                 * MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING
-                 */
-                int failedAttempts = rw1.replicationFailedLedgers.get(lh.getId()).get();
+
+                // the number of failed attempts should have increased.
+                int rw1CurFailedAttemptsCount = rw1.replicationFailedLedgers.get(lh.getId()).get();
                 assertTrue(
-                        "The number of failed attempts should be less than "
-                                + "ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
-                                + "but it is "
-                                + failedAttempts,
-                        failedAttempts <= ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+                        "The current number of failed attempts: " + rw1CurFailedAttemptsCount
+                                + " should be greater than or equal to previous value: " + rw1PrevFailedAttemptsCount,
+                        rw1CurFailedAttemptsCount >= rw1PrevFailedAttemptsCount);
+                rw1PrevFailedAttemptsCount = rw1CurFailedAttemptsCount;
 
-                failedAttempts = rw2.replicationFailedLedgers.get(lh.getId()).get();
+                int rw2CurFailedAttemptsCount = rw2.replicationFailedLedgers.get(lh.getId()).get();
                 assertTrue(
-                        "The number of failed attempts should be less than "
-                                + "ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING, "
-                                + "but it is "
-                                + failedAttempts,
-                        failedAttempts <= ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING);
+                        "The current number of failed attempts: " + rw2CurFailedAttemptsCount
+                                + " should be greater than or equal to previous value: " + rw2PrevFailedAttemptsCount,
+                        rw2CurFailedAttemptsCount >= rw2PrevFailedAttemptsCount);
+                rw2PrevFailedAttemptsCount = rw2CurFailedAttemptsCount;
 
                 Thread.sleep(50);
             }
@@ -551,7 +552,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
              * should succeed in replicating this under replicated ledger and it
              * shouldn't be under replicated anymore.
              */
-            int timeToWaitForReplicationToComplete = 2000;
+            int timeToWaitForReplicationToComplete = 20000;
             int timeWaited = 0;
             while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath)) {
                 Thread.sleep(100);
@@ -560,6 +561,164 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                     fail("Ledger should be replicated by now");
                 }
             }
+
+            rw1PrevFailedAttemptsCount = rw1.replicationFailedLedgers.get(lh.getId()).get();
+            rw2PrevFailedAttemptsCount = rw2.replicationFailedLedgers.get(lh.getId()).get();
+            Thread.sleep(2000);
+            // now since the ledger is replicated, number of failed attempts
+            // counter shouldn't be increased even after sleeping for sometime.
+            assertEquals("rw1 failedattempts", rw1PrevFailedAttemptsCount,
+                    rw1.replicationFailedLedgers.get(lh.getId()).get());
+            assertEquals("rw2 failed attempts ", rw2PrevFailedAttemptsCount,
+                    rw2.replicationFailedLedgers.get(lh.getId()).get());
+
+            /*
+             * Since these entries are eventually available, and replication has
+             * eventually succeeded, in one of the RW
+             * unableToReadEntriesForReplication should be 0.
+             */
+            int rw1UnableToReadEntriesForReplication = rw1.unableToReadEntriesForReplication.get(lh.getId()).size();
+            int rw2UnableToReadEntriesForReplication = rw2.unableToReadEntriesForReplication.get(lh.getId()).size();
+            assertTrue(
+                    "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + " in RW2: "
+                            + rw2UnableToReadEntriesForReplication,
+                    (rw1UnableToReadEntriesForReplication == 0) || (rw2UnableToReadEntriesForReplication == 0));
+        } finally {
+            rw1.shutdown();
+            rw2.shutdown();
+            underReplicationManager.close();
+        }
+    }
+
+    class InjectedReplicationWorker extends ReplicationWorker {
+        CopyOnWriteArrayList<Long> delayReplicationPeriods;
+
+        public InjectedReplicationWorker(ServerConfiguration conf, StatsLogger statsLogger,
+                CopyOnWriteArrayList<Long> delayReplicationPeriods)
+                throws CompatibilityException, KeeperException, InterruptedException, IOException {
+            super(conf, statsLogger);
+            this.delayReplicationPeriods = delayReplicationPeriods;
+        }
+
+        @Override
+        void scheduleTaskWithDelay(TimerTask timerTask, long delayPeriod) {
+            delayReplicationPeriods.add(delayPeriod);
+            super.scheduleTaskWithDelay(timerTask, delayPeriod);
+        }
+    }
+
+    @Test
+    public void testDeferLedgerLockReleaseForReplicationWorker() throws Exception {
+        int ensembleSize = 3;
+        LedgerHandle lh = bkc.createLedger(ensembleSize, ensembleSize, BookKeeper.DigestType.CRC32, TESTPASSWD);
+        int numOfEntries = 7;
+        for (int i = 0; i < numOfEntries; i++) {
+            lh.addEntry(data);
+        }
+        lh.close();
+
+        BookieSocketAddress[] bookiesKilled = new BookieSocketAddress[ensembleSize];
+        ServerConfiguration[] killedBookiesConfig = new ServerConfiguration[ensembleSize];
+
+        // kill all bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            bookiesKilled[i] = lh.getLedgerMetadata().getAllEnsembles().get(0L).get(i);
+            killedBookiesConfig[i] = getBkConf(bookiesKilled[i]);
+            LOG.info("Killing Bookie : {}", bookiesKilled[i]);
+            killBookie(bookiesKilled[i]);
+        }
+
+        // start new bookiesToKill number of bookies
+        for (int i = 0; i < ensembleSize; i++) {
+            startNewBookieAndReturnAddress();
+        }
+
+        // create couple of replicationworkers
+        long lockReleaseOfFailedLedgerGracePeriod = 64L;
+        long baseBackoffForLockReleaseOfFailedLedger = lockReleaseOfFailedLedgerGracePeriod
+                / (int) Math.pow(2, ReplicationWorker.NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS);
+        ServerConfiguration newRWConf = new ServerConfiguration(baseConf);
+        newRWConf.setLockReleaseOfFailedLedgerGracePeriod(Long.toString(lockReleaseOfFailedLedgerGracePeriod));
+        newRWConf.setRereplicationEntryBatchSize(1000);
+        CopyOnWriteArrayList<Long> rw1DelayReplicationPeriods = new CopyOnWriteArrayList<Long>();
+        CopyOnWriteArrayList<Long> rw2DelayReplicationPeriods = new CopyOnWriteArrayList<Long>();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger1 = statsProvider.getStatsLogger("rw1");
+        TestStatsLogger statsLogger2 = statsProvider.getStatsLogger("rw2");
+        ReplicationWorker rw1 = new InjectedReplicationWorker(newRWConf, statsLogger1, rw1DelayReplicationPeriods);
+        ReplicationWorker rw2 = new InjectedReplicationWorker(newRWConf, statsLogger2, rw2DelayReplicationPeriods);
+
+        Counter numEntriesUnableToReadForReplication1 = statsLogger1
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION);
+        Counter numEntriesUnableToReadForReplication2 = statsLogger2
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_REPLICATION);
+        @Cleanup
+        MetadataClientDriver clientDriver = MetadataDrivers
+                .getClientDriver(URI.create(baseClientConf.getMetadataServiceUri()));
+        clientDriver.initialize(baseClientConf, scheduler, NullStatsLogger.INSTANCE, Optional.empty());
+
+        LedgerManagerFactory mFactory = clientDriver.getLedgerManagerFactory();
+
+        LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+        try {
+            // mark ledger underreplicated
+            for (int i = 0; i < bookiesKilled.length; i++) {
+                underReplicationManager.markLedgerUnderreplicated(lh.getId(), bookiesKilled[i].toString());
+            }
+            while (!ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath)) {
+                Thread.sleep(100);
+            }
+            rw1.start();
+            rw2.start();
+
+            // wait for RWs to complete 'numOfAttemptsToWaitFor' failed attempts
+            int numOfAttemptsToWaitFor = 10;
+            while ((rw1.replicationFailedLedgers.get(lh.getId()).get() < numOfAttemptsToWaitFor)
+                    || rw2.replicationFailedLedgers.get(lh.getId()).get() < numOfAttemptsToWaitFor) {
+                Thread.sleep(500);
+            }
+
+            /*
+             * since all the bookies containing the ledger entries are down
+             * replication wouldn't have succeeded.
+             */
+            assertTrue("Ledger: " + lh.getId() + " should be underreplicated",
+                    ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath));
+
+            /*
+             * since RW failed 'numOfAttemptsToWaitFor' number of times, we
+             * should have atleast (numOfAttemptsToWaitFor - 1)
+             * delayReplicationPeriods and their value should be
+             * (lockReleaseOfFailedLedgerGracePeriod/16) , 2 * previous value,..
+             * with max : lockReleaseOfFailedLedgerGracePeriod
+             */
+            for (int i = 0; i < ((numOfAttemptsToWaitFor - 1)); i++) {
+                long expectedDelayValue = Math.min(lockReleaseOfFailedLedgerGracePeriod,
+                        baseBackoffForLockReleaseOfFailedLedger * (1 << i));
+                assertEquals("RW1 delayperiod", (Long) expectedDelayValue, rw1DelayReplicationPeriods.get(i));
+                assertEquals("RW2 delayperiod", (Long) expectedDelayValue, rw2DelayReplicationPeriods.get(i));
+            }
+
+            /*
+             * RW wont try to replicate until and unless RW succeed in reading
+             * those failed entries before proceeding with replication of under
+             * replicated fragment, so the numEntriesUnableToReadForReplication
+             * should be just 'numOfEntries', though RW failed to replicate
+             * multiple times.
+             */
+            assertEquals("numEntriesUnableToReadForReplication for RW1", Long.valueOf((long) numOfEntries),
+                    numEntriesUnableToReadForReplication1.get());
+            assertEquals("numEntriesUnableToReadForReplication for RW2", Long.valueOf((long) numOfEntries),
+                    numEntriesUnableToReadForReplication2.get());
+
+            /*
+             * Since these entries are unavailable,
+             * unableToReadEntriesForReplication should be of size numOfEntries.
+             */
+            assertEquals("RW1 unabletoreadentries", numOfEntries,
+                    rw1.unableToReadEntriesForReplication.get(lh.getId()).size());
+            assertEquals("RW2 unabletoreadentries", numOfEntries,
+                    rw2.unableToReadEntriesForReplication.get(lh.getId()).size());
         } finally {
             rw1.shutdown();
             rw2.shutdown();
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index d83308e..8e3da6e 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -697,11 +697,11 @@ groups:
     description: The number of entries that a replication will rereplicate in parallel.
     default: 10
   - param: openLedgerRereplicationGracePeriod
-    description: The grace period, in seconds, that the replication worker waits before fencing and replicating a ledger fragment that's still being written to upon bookie failure.
-    default: 30
+    description: The grace period, in milliseconds, that the replication worker waits before fencing and replicating a ledger fragment that's still being written to upon bookie failure.
+    default: 30000
   - param: lockReleaseOfFailedLedgerGracePeriod
-    description: the grace period so that if the replication worker fails to replicate a underreplicatedledger for more than ReplicationWorker.MAXNUMBER_REPLICATION_FAILURES_ALLOWED_BEFORE_DEFERRING number of times, then instead of releasing the lock immediately after failed attempt, it will hold under replicated ledger lock for this grace period and then it will release the lock.
-    default: 60
+    description: Set the grace period, in milliseconds, which the replication worker has to wait before releasing the lock after it failed to replicate a ledger. For the first ReplicationWorker.NUM_OF_EXPONENTIAL_BACKOFF_RETRIALS failures it will do exponential backoff then it will bound at lockReleaseOfFailedLedgerGracePeriod.
+    default: 300000
   - param: rwRereplicateBackoffMs
     description: The time to backoff when replication worker encounters exceptions on replicating a ledger, in milliseconds.
     default: 5000