You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/07/21 20:29:00 UTC

[bookkeeper] branch master updated: Implementation of new scrutiny - replicas check (Part-1)

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e8a72  Implementation of new scrutiny - replicas check (Part-1)
86e8a72 is described below

commit 86e8a72c7fcfb38b76bcbf434df586f3dc00a2af
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Sun Jul 21 13:28:55 2019 -0700

    Implementation of new scrutiny - replicas check (Part-1)
    
    Descriptions of the changes in this PR:
    
    - this is implementation of scrutiny mentioned in
    https://github.com/apache/bookkeeper/blob/master/site/bps/BP-34-cluster-metadata-checker.md
    - In this commit, urLedgersElapsedRecoveryGracePeriod check is added to placementPolicyCheck.
    - new scrutiny is added replicasCheck and it is disabled by default.
    - this is first part of replicas check, in this part currently metrics are not added and
    also delayed replicas check for potentially faulty ledgers (failure in trying to get
    info. from Bookie of the ensembles of the ledger) is not added.
    - have to add more failure test scenarios for replicas check logic.
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <None>
    
    This closes #2120 from reddycharan/replicascheck and squashes the following commits:
    
    d4dc81e15 [Sijie Guo] Merge remote-tracking branch 'apache/master' into replicascheck
    5a3fa848f [Matteo Minardi] Issue #2124: Test 'ListBookiesCommand ' is failing
    df24f672a [vzhikserg] Use bulk operation instead of iteration
    2a1cfe249 [vzhikserg] Fix log statements with incorrect placeholders
---
 bookkeeper-proto/src/main/proto/DataFormats.proto  |   6 +
 .../bookkeeper/client/DistributionSchedule.java    |  18 +
 .../client/RoundRobinDistributionSchedule.java     |  32 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  50 ++
 .../meta/LedgerUnderreplicationManager.java        |  29 +
 .../meta/ZkLedgerUnderreplicationManager.java      | 115 ++-
 .../org/apache/bookkeeper/replication/Auditor.java | 869 ++++++++++++++++++++-
 .../bookkeeper/replication/ReplicationStats.java   |   8 +
 .../util/AvailabilityOfEntriesOfLedger.java        |  57 +-
 .../bookkeeper/util/BookKeeperConstants.java       |   1 +
 .../bookkeeper/client/BookKeeperAdminTest.java     |  40 +
 .../client/RoundRobinDistributionScheduleTest.java |  46 ++
 .../replication/AuditorLedgerCheckerTest.java      |   4 +-
 .../replication/AuditorPeriodicCheckTest.java      | 109 +++
 .../AuditorPlacementPolicyCheckTest.java           | 130 +++
 .../TestLedgerUnderreplicationManager.java         |  15 +
 .../util/AvailabilityOfEntriesOfLedgerTest.java    | 110 +++
 site/_data/config/bk_server.yaml                   |   6 +
 18 files changed, 1593 insertions(+), 52 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto b/bookkeeper-proto/src/main/proto/DataFormats.proto
index 2501aff..6591d65 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -112,3 +112,9 @@ message PlacementPolicyCheckFormat {
     optional int64 placementPolicyCheckCTime = 1;
 }
 
+/**
+ * information of ReplicasCheck execution
+ */
+message ReplicasCheckFormat {
+    optional int64 replicasCheckCTime = 1;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
index d53129d..dc65346 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.util.BitSet;
 import java.util.Map;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -236,4 +237,21 @@ public interface DistributionSchedule {
      * @return true if it has entry otherwise false.
      */
     boolean hasEntry(long entryId, int bookieIndex);
+
+    /**
+     * Get the bitset representing the entries from entry 'startEntryId' to
+     * 'lastEntryId', that would be striped to the bookie with index -
+     * bookieIndex. Value of the bit with the 'bitIndex+n', indicate whether
+     * entry with entryid 'startEntryId+n' is striped to this bookie or not.
+     *
+     * @param bookieIndex
+     *            index of the bookie in the ensemble starting with 0
+     * @param startEntryId
+     *            starting entryid
+     * @param lastEntryId
+     *            last entryid
+     * @return the bitset representing the entries that would be striped to the
+     *         bookie
+     */
+    BitSet getEntriesStripedToTheBookie(int bookieIndex, long startEntryId, long lastEntryId);
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
index a6506af..05711fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java
@@ -29,6 +29,8 @@ import java.util.BitSet;
 import java.util.Map;
 
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A specific {@link DistributionSchedule} that places entries in round-robin
@@ -37,7 +39,8 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
  * on.
  *
  */
-class RoundRobinDistributionSchedule implements DistributionSchedule {
+public class RoundRobinDistributionSchedule implements DistributionSchedule {
+    private static final Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionSchedule.class);
     private final int writeQuorumSize;
     private final int ackQuorumSize;
     private final int ensembleSize;
@@ -409,4 +412,31 @@ class RoundRobinDistributionSchedule implements DistributionSchedule {
             w.recycle();
         }
     }
+
+    @Override
+    public BitSet getEntriesStripedToTheBookie(int bookieIndex, long startEntryId, long lastEntryId) {
+        if ((startEntryId < 0) || (lastEntryId < 0) || (bookieIndex < 0) || (bookieIndex >= ensembleSize)
+                || (lastEntryId < startEntryId)) {
+            LOG.error(
+                    "Illegal arguments for getEntriesStripedToTheBookie, bookieIndex : {},"
+                            + " ensembleSize : {}, startEntryId : {}, lastEntryId : {}",
+                    bookieIndex, ensembleSize, startEntryId, lastEntryId);
+            throw new IllegalArgumentException("Illegal arguments for getEntriesStripedToTheBookie");
+        }
+        BitSet entriesStripedToTheBookie = new BitSet((int) (lastEntryId - startEntryId + 1));
+        for (long entryId = startEntryId; entryId <= lastEntryId; entryId++) {
+            int modValOfFirstReplica = (int) (entryId % ensembleSize);
+            int modValOfLastReplica = (int) ((entryId + writeQuorumSize - 1) % ensembleSize);
+            if (modValOfLastReplica >= modValOfFirstReplica) {
+                if ((bookieIndex >= modValOfFirstReplica) && (bookieIndex <= modValOfLastReplica)) {
+                    entriesStripedToTheBookie.set((int) (entryId - startEntryId));
+                }
+            } else {
+                if ((bookieIndex >= modValOfFirstReplica) || (bookieIndex <= modValOfLastReplica)) {
+                    entriesStripedToTheBookie.set((int) (entryId - startEntryId));
+                }
+            }
+        }
+        return entriesStripedToTheBookie;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index f4972f0..75e230c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -187,6 +187,9 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     protected static final String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
     protected static final String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
     protected static final String RW_REREPLICATE_BACKOFF_MS = "rwRereplicateBackoffMs";
+    protected static final String UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD =
+            "underreplicatedLedgerRecoveryGracePeriod";
+    protected static final String AUDITOR_REPLICAS_CHECK_INTERVAL = "auditorReplicasCheckInterval";
 
     // Worker Thread parameters.
     protected static final String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
@@ -2214,6 +2217,53 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Sets the grace period (in seconds) for underreplicated ledgers recovery.
+     * If ledger is marked underreplicated for more than this period then it
+     * will be reported by placementPolicyCheck in Auditor. Setting this to 0
+     * will disable this check.
+     *
+     * @param gracePeriod
+     *            The interval in seconds. e.g. 3600 = 1 hour
+     */
+    public void setUnderreplicatedLedgerRecoveryGracePeriod(long gracePeriod) {
+        setProperty(UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD, gracePeriod);
+    }
+
+    /**
+     * Gets the grace period (in seconds) for underreplicated ledgers recovery.
+     * If ledger is marked underreplicated for more than this period then it
+     * will be reported by placementPolicyCheck in Auditor. Setting this to 0
+     * will disable this check.
+     *
+     * @return The interval in seconds. By default it is disabled.
+     */
+    public long getUnderreplicatedLedgerRecoveryGracePeriod() {
+        return getLong(UNDERREPLICATED_LEDGER_RECOVERY_GRACE_PERIOD, 0);
+    }
+
+    /**
+     * Sets the interval at which the auditor will run a replicas check of all
+     * ledgers. This should not be run very often since it validates
+     * availability of replicas of all ledgers by querying bookies. Setting this
+     * to 0 will disable the periodic replicas check.
+     *
+     * @param interval
+     *            The interval in seconds. e.g. 86400 = 1 day, 604800 = 1 week
+     */
+    public void setAuditorPeriodicReplicasCheckInterval(long interval) {
+        setProperty(AUDITOR_REPLICAS_CHECK_INTERVAL, interval);
+    }
+
+    /**
+     * Get the interval at which the auditor does replicas check of all ledgers.
+     *
+     * @return The interval in seconds. By default it is disabled.
+     */
+    public long getAuditorPeriodicReplicasCheckInterval() {
+        return getLong(AUDITOR_REPLICAS_CHECK_INTERVAL, 0);
+    }
+
+    /**
      * Set what percentage of a ledger (fragment)'s entries will be verified.
      * 0 - only the first and last entry of each ledger fragment would be verified
      * 100 - the entire ledger fragment would be verified
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 4dadffe..9f2d0cb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.meta;
 
 import com.google.common.collect.Lists;
+
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -61,6 +62,18 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
             throws ReplicationException.UnavailableException;
 
     /**
+     * Get the UnderreplicatedLedger info if this ledger is marked
+     * underreplicated otherwise it returns null.
+     *
+     * @param ledgerId
+     *            ledger id
+     * @return the UnderreplicatedLedger info instance if this ledger is marked
+     *         underreplicated otherwise it returns null.
+     * @throws ReplicationException.UnavailableException
+     */
+    UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId) throws ReplicationException.UnavailableException;
+
+    /**
      * Get a list of all the underreplicated ledgers which have been
      * marked for rereplication, filtered by the predicate on the missing replicas list.
      *
@@ -201,6 +214,22 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
     long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException;
 
     /**
+     * Setter for the ReplicasCheck last executed ctime.
+     *
+     * @param replicasCheckCTime
+     * @throws ReplicationException.UnavailableException
+     */
+    void setReplicasCheckCTime(long replicasCheckCTime) throws ReplicationException.UnavailableException;
+
+    /**
+     * Getter for the ReplicasCheck last executed ctime.
+     *
+     * @return the long value of replicasCheckCTime
+     * @throws ReplicationException.UnavailableException
+     */
+    long getReplicasCheckCTime() throws ReplicationException.UnavailableException;
+
+    /**
      * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
      *
      * @param cb
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 22bed6f..7d25e82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -20,7 +20,6 @@ package org.apache.bookkeeper.meta;
 
 import static com.google.common.base.Charsets.UTF_8;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.TextFormat;
@@ -51,6 +50,7 @@ import org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
 import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
 import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
 import org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
+import org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationEnableCb;
 import org.apache.bookkeeper.replication.ReplicationException;
@@ -120,6 +120,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final String lostBookieRecoveryDelayZnode;
     private final String checkAllLedgersCtimeZnode;
     private final String placementPolicyCheckCtimeZnode;
+    private final String replicasCheckCtimeZnode;
     private final ZooKeeper zkc;
     private final SubTreeCache subTreeCache;
 
@@ -134,6 +135,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         lostBookieRecoveryDelayZnode = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
         checkAllLedgersCtimeZnode = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
         placementPolicyCheckCtimeZnode = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
+        replicasCheckCtimeZnode = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
         idExtractionPattern = Pattern.compile("urL(\\d+)$");
         this.zkc = zkc;
         this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
@@ -253,14 +255,38 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         return getUrLedgerZnode(urLedgerPath, ledgerId);
     }
 
-    @VisibleForTesting
-    public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId)
-            throws KeeperException, TextFormat.ParseException, InterruptedException {
-        String znode = getUrLedgerZnode(ledgerId);
-        UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
-        byte[] data = zkc.getData(znode, false, null);
-        TextFormat.merge(new String(data, UTF_8), builder);
-        return builder.build();
+    @Override
+    public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId)
+            throws ReplicationException.UnavailableException {
+        try {
+            String znode = getUrLedgerZnode(ledgerId);
+            UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+            byte[] data = null;
+            try {
+                data = zkc.getData(znode, false, null);
+            } catch (KeeperException.NoNodeException nne) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ledger: {} is not marked underreplicated", ledgerId);
+                }
+                return null;
+            }
+            TextFormat.merge(new String(data, UTF_8), builder);
+            UnderreplicatedLedgerFormat underreplicatedLedgerFormat = builder.build();
+            UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(ledgerId);
+            List<String> replicaList = underreplicatedLedgerFormat.getReplicaList();
+            long ctime = (underreplicatedLedgerFormat.hasCtime() ? underreplicatedLedgerFormat.getCtime()
+                    : UnderreplicatedLedger.UNASSIGNED_CTIME);
+            underreplicatedLedger.setCtime(ctime);
+            underreplicatedLedger.setReplicaList(replicaList);
+            return underreplicatedLedger;
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
+        } catch (TextFormat.ParseException pe) {
+            throw new ReplicationException.UnavailableException("Error parsing proto message", pe);
+        }
     }
 
     @Override
@@ -433,28 +459,18 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                     String parent = queue.remove();
                     try {
                         for (String c : zkc.getChildren(parent, false)) {
-                            try {
-                                String child = parent + "/" + c;
-                                if (c.startsWith("urL")) {
-                                    long ledgerId = getLedgerId(child);
-                                    UnderreplicatedLedgerFormat underreplicatedLedgerFormat =
-                                            getLedgerUnreplicationInfo(ledgerId);
-                                    List<String> replicaList = underreplicatedLedgerFormat.getReplicaList();
-                                    long ctime = (underreplicatedLedgerFormat.hasCtime()
-                                            ? underreplicatedLedgerFormat.getCtime()
-                                            : UnderreplicatedLedger.UNASSIGNED_CTIME);
+                            String child = parent + "/" + c;
+                            if (c.startsWith("urL")) {
+                                long ledgerId = getLedgerId(child);
+                                UnderreplicatedLedger underreplicatedLedger = getLedgerUnreplicationInfo(ledgerId);
+                                if (underreplicatedLedger != null) {
+                                    List<String> replicaList = underreplicatedLedger.getReplicaList();
                                     if ((predicate == null) || predicate.test(replicaList)) {
-                                        UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(
-                                                ledgerId);
-                                        underreplicatedLedger.setCtime(ctime);
-                                        underreplicatedLedger.setReplicaList(replicaList);
                                         curBatch.add(underreplicatedLedger);
                                     }
-                                } else {
-                                    queue.add(child);
                                 }
-                            } catch (KeeperException.NoNodeException nne) {
-                                // ignore
+                            } else {
+                                queue.add(child);
                             }
                         }
                     } catch (InterruptedException ie) {
@@ -983,4 +999,49 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
         }
     }
+
+    @Override
+    public void setReplicasCheckCTime(long replicasCheckCTime) throws UnavailableException {
+        try {
+            List<ACL> zkAcls = ZkUtils.getACLs(conf);
+            ReplicasCheckFormat.Builder builder = ReplicasCheckFormat.newBuilder();
+            builder.setReplicasCheckCTime(replicasCheckCTime);
+            byte[] replicasCheckFormatByteArray = builder.build().toByteArray();
+            if (zkc.exists(replicasCheckCtimeZnode, false) != null) {
+                zkc.setData(replicasCheckCtimeZnode, replicasCheckFormatByteArray, -1);
+            } else {
+                zkc.create(replicasCheckCtimeZnode, replicasCheckFormatByteArray, zkAcls, CreateMode.PERSISTENT);
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("setReplicasCheckCTime completed successfully");
+            }
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public long getReplicasCheckCTime() throws UnavailableException {
+        try {
+            byte[] data = zkc.getData(replicasCheckCtimeZnode, false, null);
+            ReplicasCheckFormat replicasCheckFormat = ReplicasCheckFormat.parseFrom(data);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("getReplicasCheckCTime completed successfully");
+            }
+            return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1;
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.warn("replicasCheckCtimeZnode is not yet available");
+            return -1;
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        } catch (InvalidProtocolBufferException ipbe) {
+            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 036f5d5..3b8219f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -29,32 +29,49 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BOOKIE_AUDI
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
 import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
+import static org.apache.bookkeeper.replication.ReplicationStats.REPLICAS_CHECK_TIME;
 import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
+import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.client.BKException;
@@ -65,16 +82,21 @@ import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdher
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.RoundRobinDistributionSchedule;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
+import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -85,8 +107,11 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,6 +131,8 @@ import org.slf4j.LoggerFactory;
 )
 public class Auditor implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
+    private static final int MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS = 100;
+    private static final int REPLICAS_CHECK_TIMEOUT_IN_SECS = 120;
     private final ServerConfiguration conf;
     private final BookKeeper bkc;
     private final boolean ownBkc;
@@ -124,6 +151,16 @@ public class Auditor implements AutoCloseable {
     private final AtomicInteger ledgersSoftlyAdheringToPlacementPolicyGuageValue;
     private final AtomicInteger numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck;
     private final AtomicInteger numOfClosedLedgersAuditedInPlacementPolicyCheck;
+    private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriodGuageValue;
+    private final AtomicInteger numOfURLedgersElapsedRecoveryGracePeriod;
+    private final AtomicInteger numLedgersHavingNoReplicaOfAnEntryGuageValue;
+    private final AtomicInteger numLedgersFoundHavingNoReplicaOfAnEntry;
+    private final AtomicInteger numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue;
+    private final AtomicInteger numLedgersFoundHavingLessThanAQReplicasOfAnEntry;
+    private final AtomicInteger numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue;
+    private final AtomicInteger numLedgersFoundHavingLessThanWQReplicasOfAnEntry;
+    private final long underreplicatedLedgerRecoveryGracePeriod;
+    private final int zkOpTimeoutMs;
 
     private final StatsLogger statsLogger;
     @StatsDoc(
@@ -152,6 +189,11 @@ public class Auditor implements AutoCloseable {
         )
     private final OpStatsLogger placementPolicyCheckTime;
     @StatsDoc(
+            name = REPLICAS_CHECK_TIME,
+            help = "the latency distribution of replicas check"
+        )
+    private final OpStatsLogger replicasCheckTime;
+    @StatsDoc(
         name = AUDIT_BOOKIES_TIME,
         help = "the latency distribution of auditing all the bookies"
     )
@@ -191,6 +233,28 @@ public class Auditor implements AutoCloseable {
             help = "Gauge for number of ledgers softly adhering to placement policy found in placement policy check"
     )
     private final Gauge<Integer> numLedgersSoftlyAdheringToPlacementPolicy;
+    @StatsDoc(
+            name = NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
+            help = "Gauge for number of underreplicated ledgers elapsed recovery grace period"
+    )
+    private final Gauge<Integer> numUnderreplicatedLedgersElapsedRecoveryGracePeriod;
+    @StatsDoc(
+            name = NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
+            help = "Gauge for number of ledgers having an entry with all the replicas missing"
+    )
+    private final Gauge<Integer> numLedgersHavingNoReplicaOfAnEntry;
+    @StatsDoc(
+            name = NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
+            help = "Gauge for number of ledgers having an entry with less than AQ number of replicas"
+                    + ", this doesn't include ledgers counted towards numLedgersHavingNoReplicaOfAnEntry"
+    )
+    private final Gauge<Integer> numLedgersHavingLessThanAQReplicasOfAnEntry;
+    @StatsDoc(
+            name = NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
+            help = "Gauge for number of ledgers having an entry with less than WQ number of replicas"
+                    + ", this doesn't include ledgers counted towards numLedgersHavingLessThanAQReplicasOfAnEntry"
+    )
+    private final Gauge<Integer> numLedgersHavingLessThanWQReplicasOfAnEntry;
 
     static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
         return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -239,6 +303,8 @@ public class Auditor implements AutoCloseable {
                    StatsLogger statsLogger)
         throws UnavailableException {
         this.conf = conf;
+        this.underreplicatedLedgerRecoveryGracePeriod = conf.getUnderreplicatedLedgerRecoveryGracePeriod();
+        this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
         this.bookieIdentifier = bookieIdentifier;
         this.statsLogger = statsLogger;
         this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new AtomicInteger(0);
@@ -246,6 +312,14 @@ public class Auditor implements AutoCloseable {
         this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new AtomicInteger(0);
         this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new AtomicInteger(0);
         this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new AtomicInteger(0);
+        this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
+        this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new AtomicInteger(0);
+        this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new AtomicInteger(0);
+        this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
+        this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
+        this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new AtomicInteger(0);
+        this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new AtomicInteger(0);
+        this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new AtomicInteger(0);
 
         numUnderReplicatedLedger = this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
         uRLPublishTimeForLostBookies = this.statsLogger
@@ -254,6 +328,7 @@ public class Auditor implements AutoCloseable {
                 .getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
         checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
         placementPolicyCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+        replicasCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
         auditBookiesTime = this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
         numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
         numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
@@ -288,6 +363,60 @@ public class Auditor implements AutoCloseable {
         this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
                 numLedgersSoftlyAdheringToPlacementPolicy);
 
+        numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
+            }
+        };
+        this.statsLogger.registerGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
+                numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
+
+        numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
+            }
+        };
+        this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
+                numLedgersHavingNoReplicaOfAnEntry);
+        numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
+            }
+        };
+        this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
+                numLedgersHavingLessThanAQReplicasOfAnEntry);
+        numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>() {
+            @Override
+            public Integer getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Integer getSample() {
+                return numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
+            }
+        };
+        this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
+                numLedgersHavingLessThanWQReplicasOfAnEntry);
+
         this.bkc = bkc;
         this.ownBkc = ownBkc;
         initialize(conf, bkc);
@@ -346,14 +475,14 @@ public class Auditor implements AutoCloseable {
                 LOG.info("executor is already shutdown");
                 return;
             }
-            executor.submit(new Runnable() {
+            executor.submit(safeRun(new Runnable() {
                 public void run() {
                     synchronized (Auditor.this) {
                         LOG.info("Shutting down Auditor's Executor");
                         executor.shutdown();
                     }
                 }
-            });
+            }));
         }
     }
 
@@ -364,7 +493,7 @@ public class Auditor implements AutoCloseable {
             f.setException(new BKAuditException("Auditor shutting down"));
             return f;
         }
-        return executor.submit(new Runnable() {
+        return executor.submit(safeRun(new Runnable() {
                 @SuppressWarnings("unchecked")
                 public void run() {
                     try {
@@ -420,13 +549,13 @@ public class Auditor implements AutoCloseable {
                         }
                         if (auditTask == null) {
                             // if there is no scheduled audit, schedule one
-                            auditTask = executor.schedule(new Runnable() {
+                            auditTask = executor.schedule(safeRun(new Runnable() {
                                 public void run() {
                                     startAudit(false);
                                     auditTask = null;
                                     bookiesToBeAudited.clear();
                                 }
-                            }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
+                            }), lostBookieRecoveryDelay, TimeUnit.SECONDS);
                             numBookieAuditsDelayed.inc();
                             LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay,
                                     bookiesToBeAudited);
@@ -440,7 +569,7 @@ public class Auditor implements AutoCloseable {
                         LOG.error("Exception while watching available bookies", ue);
                     }
                 }
-            });
+            }));
     }
 
     synchronized Future<?> submitLostBookieRecoveryDelayChangedEvent() {
@@ -449,7 +578,7 @@ public class Auditor implements AutoCloseable {
             f.setException(new BKAuditException("Auditor shutting down"));
             return f;
         }
-        return executor.submit(new Runnable() {
+        return executor.submit(safeRun(new Runnable() {
             int lostBookieRecoveryDelay = -1;
             public void run() {
                 try {
@@ -479,13 +608,13 @@ public class Auditor implements AutoCloseable {
                     } else if (auditTask != null) {
                         LOG.info("lostBookieRecoveryDelay has been set to {}, so rescheduling AuditTask accordingly",
                                 lostBookieRecoveryDelay);
-                        auditTask = executor.schedule(new Runnable() {
+                        auditTask = executor.schedule(safeRun(new Runnable() {
                             public void run() {
                                 startAudit(false);
                                 auditTask = null;
                                 bookiesToBeAudited.clear();
                             }
-                        }, lostBookieRecoveryDelay, TimeUnit.SECONDS);
+                        }), lostBookieRecoveryDelay, TimeUnit.SECONDS);
                         numBookieAuditsDelayed.inc();
                     }
                 } catch (InterruptedException ie) {
@@ -499,7 +628,7 @@ public class Auditor implements AutoCloseable {
                     }
                 }
             }
-        });
+        }));
     }
 
     public void start() {
@@ -531,6 +660,7 @@ public class Auditor implements AutoCloseable {
             scheduleBookieCheckTask();
             scheduleCheckAllLedgersTask();
             schedulePlacementPolicyCheckTask();
+            scheduleReplicasCheckTask();
         }
     }
 
@@ -538,11 +668,11 @@ public class Auditor implements AutoCloseable {
         long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
         if (bookieCheckInterval == 0) {
             LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
-            executor.submit(bookieCheck);
+            executor.submit(safeRun(bookieCheck));
         } else {
             LOG.info("Auditor periodic bookie checking enabled" + " 'auditorPeriodicBookieCheckInterval' {} seconds",
                     bookieCheckInterval);
-            executor.scheduleAtFixedRate(bookieCheck, 0, bookieCheckInterval, TimeUnit.SECONDS);
+            executor.scheduleAtFixedRate(safeRun(bookieCheck), 0, bookieCheckInterval, TimeUnit.SECONDS);
         }
     }
 
@@ -580,7 +710,7 @@ public class Auditor implements AutoCloseable {
                             + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
                     checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
 
-            executor.scheduleAtFixedRate(new Runnable() {
+            executor.scheduleAtFixedRate(safeRun(new Runnable() {
                 public void run() {
                     try {
                         if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
@@ -607,7 +737,7 @@ public class Auditor implements AutoCloseable {
                         LOG.error("Underreplication manager unavailable running periodic check", ue);
                     }
                 }
-                }, initialDelay, interval, TimeUnit.SECONDS);
+                }), initialDelay, interval, TimeUnit.SECONDS);
         } else {
             LOG.info("Periodic checking disabled");
         }
@@ -647,7 +777,7 @@ public class Auditor implements AutoCloseable {
                             + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
                     placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
 
-            executor.scheduleAtFixedRate(new Runnable() {
+            executor.scheduleAtFixedRate(safeRun(new Runnable() {
                 public void run() {
                     try {
                         Stopwatch stopwatch = Stopwatch.createStarted();
@@ -660,18 +790,24 @@ public class Auditor implements AutoCloseable {
                                 numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.get();
                         int numOfClosedLedgersAuditedInPlacementPolicyCheckValue =
                                 numOfClosedLedgersAuditedInPlacementPolicyCheck.get();
+                        int numOfURLedgersElapsedRecoveryGracePeriodValue =
+                                numOfURLedgersElapsedRecoveryGracePeriod.get();
                         LOG.info(
                                 "Completed placementPolicyCheck in {} milliSeconds."
                                         + " numOfClosedLedgersAuditedInPlacementPolicyCheck {}"
                                         + " numOfLedgersNotAdheringToPlacementPolicy {}"
-                                        + " numOfLedgersSoftlyAdheringToPlacementPolicy {}",
+                                        + " numOfLedgersSoftlyAdheringToPlacementPolicy {}"
+                                        + " numOfURLedgersElapsedRecoveryGracePeriod {}",
                                 placementPolicyCheckDuration, numOfClosedLedgersAuditedInPlacementPolicyCheckValue,
                                 numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue,
-                                numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
+                                numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue,
+                                numOfURLedgersElapsedRecoveryGracePeriodValue);
                         ledgersNotAdheringToPlacementPolicyGuageValue
                                 .set(numOfLedgersFoundNotAdheringInPlacementPolicyCheckValue);
                         ledgersSoftlyAdheringToPlacementPolicyGuageValue
                                 .set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
+                        numOfURLedgersElapsedRecoveryGracePeriodGuageValue
+                                .set(numOfURLedgersElapsedRecoveryGracePeriodValue);
                         placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
                                 TimeUnit.MILLISECONDS);
                     } catch (BKAuditException e) {
@@ -699,20 +835,132 @@ public class Auditor implements AutoCloseable {
                                     .set(numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue);
                         }
 
+                        int numOfURLedgersElapsedRecoveryGracePeriodValue =
+                                numOfURLedgersElapsedRecoveryGracePeriod.get();
+                        if (numOfURLedgersElapsedRecoveryGracePeriodValue > 0) {
+                            /*
+                             * Though there is BKAuditException while doing
+                             * placementPolicyCheck, it found few urledgers have
+                             * elapsed recovery graceperiod. So reporting it.
+                             */
+                            numOfURLedgersElapsedRecoveryGracePeriodGuageValue
+                                    .set(numOfURLedgersElapsedRecoveryGracePeriodValue);
+                        }
+
                         LOG.error(
                                 "BKAuditException running periodic placementPolicy check."
                                         + "numOfLedgersNotAdheringToPlacementPolicy {}, "
-                                        + "numOfLedgersSoftlyAdheringToPlacementPolicy {}",
+                                        + "numOfLedgersSoftlyAdheringToPlacementPolicy {},"
+                                        + "numOfURLedgersElapsedRecoveryGracePeriod {}",
                                 numOfLedgersFoundInPlacementPolicyCheckValue,
-                                numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue, e);
+                                numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheckValue,
+                                numOfURLedgersElapsedRecoveryGracePeriodValue, e);
                     }
                 }
-            }, initialDelay, interval, TimeUnit.SECONDS);
+            }), initialDelay, interval, TimeUnit.SECONDS);
         } else {
             LOG.info("Periodic placementPolicy check disabled");
         }
     }
 
+    private void scheduleReplicasCheckTask() {
+        long interval = conf.getAuditorPeriodicReplicasCheckInterval();
+
+        if (interval <= 0) {
+            LOG.info("Periodic replicas check disabled");
+            return;
+        }
+
+        LOG.info("Auditor periodic replicas check enabled" + " 'auditorReplicasCheckInterval' {} seconds", interval);
+        long replicasCheckLastExecutedCTime;
+        long durationSinceLastExecutionInSecs;
+        long initialDelay;
+        try {
+            replicasCheckLastExecutedCTime = ledgerUnderreplicationManager.getReplicasCheckCTime();
+        } catch (UnavailableException ue) {
+            LOG.error("Got UnavailableException while trying to get replicasCheckCTime", ue);
+            replicasCheckLastExecutedCTime = -1;
+        }
+        if (replicasCheckLastExecutedCTime == -1) {
+            durationSinceLastExecutionInSecs = -1;
+            initialDelay = 0;
+        } else {
+            durationSinceLastExecutionInSecs = (System.currentTimeMillis() - replicasCheckLastExecutedCTime) / 1000;
+            if (durationSinceLastExecutionInSecs < 0) {
+                // this can happen if there is no strict time ordering
+                durationSinceLastExecutionInSecs = 0;
+            }
+            initialDelay = durationSinceLastExecutionInSecs > interval ? 0
+                    : (interval - durationSinceLastExecutionInSecs);
+        }
+        LOG.info(
+                "replicasCheck scheduling info. replicasCheckLastExecutedCTime: {} "
+                        + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
+                replicasCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
+
+        executor.scheduleAtFixedRate(safeRun(new Runnable() {
+            public void run() {
+                try {
+                    Stopwatch stopwatch = Stopwatch.createStarted();
+                    LOG.info("Starting ReplicasCheck");
+                    replicasCheck();
+                    long replicasCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                    int numLedgersFoundHavingNoReplicaOfAnEntryValue = numLedgersFoundHavingNoReplicaOfAnEntry.get();
+                    int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue =
+                            numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get();
+                    int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue =
+                            numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get();
+                    LOG.info(
+                            "Completed ReplicasCheck in {} milliSeconds numLedgersFoundHavingNoReplicaOfAnEntry {}"
+                                    + " numLedgersFoundHavingLessThanAQReplicasOfAnEntry {}"
+                                    + " numLedgersFoundHavingLessThanWQReplicasOfAnEntry {}.",
+                            replicasCheckDuration, numLedgersFoundHavingNoReplicaOfAnEntryValue,
+                            numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue,
+                            numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
+                    numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
+                    numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue
+                            .set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
+                    numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue
+                            .set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
+                    replicasCheckTime.registerSuccessfulEvent(replicasCheckDuration, TimeUnit.MILLISECONDS);
+                } catch (BKAuditException e) {
+                    LOG.error("BKAuditException running periodic replicas check.", e);
+                    int numLedgersFoundHavingNoReplicaOfAnEntryValue = numLedgersFoundHavingNoReplicaOfAnEntry.get();
+                    if (numLedgersFoundHavingNoReplicaOfAnEntryValue > 0) {
+                        /*
+                         * Though there is BKAuditException while doing
+                         * replicasCheck, it found few ledgers having no replica
+                         * of an entry. So reporting it.
+                         */
+                        numLedgersHavingNoReplicaOfAnEntryGuageValue.set(numLedgersFoundHavingNoReplicaOfAnEntryValue);
+                    }
+                    int numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue =
+                            numLedgersFoundHavingLessThanAQReplicasOfAnEntry.get();
+                    if (numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue > 0) {
+                        /*
+                         * Though there is BKAuditException while doing
+                         * replicasCheck, it found few ledgers having an entry
+                         * less than AQ num of Replicas. So reporting it.
+                         */
+                        numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue
+                                .set(numLedgersFoundHavingLessThanAQReplicasOfAnEntryValue);
+                    }
+                    int numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue =
+                            numLedgersFoundHavingLessThanWQReplicasOfAnEntry.get();
+                    if (numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue > 0) {
+                        /*
+                         * Though there is BKAuditException while doing
+                         * replicasCheck, it found few ledgers having an entry
+                         * less than WQ num of Replicas. So reporting it.
+                         */
+                        numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue
+                                .set(numLedgersFoundHavingLessThanWQReplicasOfAnEntryValue);
+                    }
+                }
+            }
+        }), initialDelay, interval, TimeUnit.SECONDS);
+    }
+
     private class LostBookieRecoveryDelayChangedCb implements GenericCallback<Void> {
         @Override
         public void operationComplete(int rc, Void result) {
@@ -796,7 +1044,7 @@ public class Auditor implements AutoCloseable {
             if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                 // has been disabled while we were generating the index
                 // discard this run, and schedule a new one
-                executor.submit(bookieCheck);
+                executor.submit(safeRun(bookieCheck));
                 return;
             }
         } catch (UnavailableException ue) {
@@ -983,6 +1231,31 @@ public class Auditor implements AutoCloseable {
         this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck.set(0);
         this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck.set(0);
         this.numOfClosedLedgersAuditedInPlacementPolicyCheck.set(0);
+        this.numOfURLedgersElapsedRecoveryGracePeriod.set(0);
+        if (this.underreplicatedLedgerRecoveryGracePeriod > 0) {
+            Iterator<UnderreplicatedLedger> underreplicatedLedgersInfo = ledgerUnderreplicationManager
+                    .listLedgersToRereplicate(null);
+            List<Long> urLedgersElapsedRecoveryGracePeriod = new ArrayList<Long>();
+            while (underreplicatedLedgersInfo.hasNext()) {
+                UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgersInfo.next();
+                long underreplicatedLedgerMarkTimeInMilSecs = underreplicatedLedger.getCtime();
+                if (underreplicatedLedgerMarkTimeInMilSecs != UnderreplicatedLedger.UNASSIGNED_CTIME) {
+                    long elapsedTimeInSecs =
+                            (System.currentTimeMillis() - underreplicatedLedgerMarkTimeInMilSecs) / 1000;
+                    if (elapsedTimeInSecs > this.underreplicatedLedgerRecoveryGracePeriod) {
+                        urLedgersElapsedRecoveryGracePeriod.add(underreplicatedLedger.getLedgerId());
+                        numOfURLedgersElapsedRecoveryGracePeriod.incrementAndGet();
+                    }
+                }
+            }
+            if (urLedgersElapsedRecoveryGracePeriod.isEmpty()) {
+                LOG.info("No Underreplicated ledger has elapsed recovery graceperiod: {}",
+                        urLedgersElapsedRecoveryGracePeriod);
+            } else {
+                LOG.error("Following Underreplicated ledgers have elapsed recovery graceperiod: {}",
+                        urLedgersElapsedRecoveryGracePeriod);
+            }
+        }
         Processor<Long> ledgerProcessor = new Processor<Long>() {
             @Override
             public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
@@ -1072,6 +1345,560 @@ public class Auditor implements AutoCloseable {
         }
     }
 
+    private static class MissingEntriesInfo {
+        // ledger id of missing entries
+        private final long ledgerId;
+        /*
+         * segment details, like start entryid of the segment and ensemble List.
+         */
+        private final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble;
+        // bookie missing these entries
+        private final BookieSocketAddress bookieMissingEntries;
+        /*
+         * entries of this segment which are supposed to contain in this bookie
+         * but missing in this bookie.
+         */
+        private final List<Long> unavailableEntriesList;
+
+        private MissingEntriesInfo(long ledgerId, Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble,
+                BookieSocketAddress bookieMissingEntries, List<Long> unavailableEntriesList) {
+            this.ledgerId = ledgerId;
+            this.segmentEnsemble = segmentEnsemble;
+            this.bookieMissingEntries = bookieMissingEntries;
+            this.unavailableEntriesList = unavailableEntriesList;
+        }
+
+        private long getLedgerId() {
+            return ledgerId;
+        }
+
+        private Entry<Long, ? extends List<BookieSocketAddress>> getSegmentEnsemble() {
+            return segmentEnsemble;
+        }
+
+        private BookieSocketAddress getBookieMissingEntries() {
+            return bookieMissingEntries;
+        }
+
+        private List<Long> getUnavailableEntriesList() {
+            return unavailableEntriesList;
+        }
+    }
+
+    private static class MissingEntriesInfoOfLedger {
+        private final long ledgerId;
+        private final int ensembleSize;
+        private final int writeQuorumSize;
+        private final int ackQuorumSize;
+        private final List<MissingEntriesInfo> missingEntriesInfoList;
+
+        private MissingEntriesInfoOfLedger(long ledgerId, int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                List<MissingEntriesInfo> missingEntriesInfoList) {
+            this.ledgerId = ledgerId;
+            this.ensembleSize = ensembleSize;
+            this.writeQuorumSize = writeQuorumSize;
+            this.ackQuorumSize = ackQuorumSize;
+            this.missingEntriesInfoList = missingEntriesInfoList;
+        }
+
+        private long getLedgerId() {
+            return ledgerId;
+        }
+
+        private int getEnsembleSize() {
+            return ensembleSize;
+        }
+
+        private int getWriteQuorumSize() {
+            return writeQuorumSize;
+        }
+
+        private int getAckQuorumSize() {
+            return ackQuorumSize;
+        }
+
+        private List<MissingEntriesInfo> getMissingEntriesInfoList() {
+            return missingEntriesInfoList;
+        }
+    }
+
+    private class ReadLedgerMetadataCallbackForReplicasCheck
+            implements BiConsumer<Versioned<LedgerMetadata>, Throwable> {
+        private final long ledgerInRange;
+        private final MultiCallback mcbForThisLedgerRange;
+        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
+        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;
+
+        ReadLedgerMetadataCallbackForReplicasCheck(long ledgerInRange, MultiCallback mcbForThisLedgerRange,
+                ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries,
+                ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
+            this.ledgerInRange = ledgerInRange;
+            this.mcbForThisLedgerRange = mcbForThisLedgerRange;
+            this.ledgersWithMissingEntries = ledgersWithMissingEntries;
+            this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
+        }
+
+        @Override
+        public void accept(Versioned<LedgerMetadata> metadataVer, Throwable exception) {
+            if (exception != null) {
+                if (BKException
+                        .getExceptionCode(exception) == BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
+                    LOG.debug("Ignoring replicas check of already deleted ledger {}", ledgerInRange);
+                    mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
+                    return;
+                } else {
+                    LOG.warn("Unable to read the ledger: {} information", ledgerInRange, exception);
+                    mcbForThisLedgerRange.processResult(BKException.getExceptionCode(exception), null, null);
+                    return;
+                }
+            }
+
+            LedgerMetadata metadata = metadataVer.getValue();
+            if (!metadata.isClosed()) {
+                LOG.debug("Ledger: {} is not yet closed, so skipping the replicas check analysis for now",
+                        ledgerInRange);
+                mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
+                return;
+            }
+
+            if (metadata.getLastEntryId() == -1) {
+                LOG.debug("Ledger: {} is closed but it doesn't has any entries, so skipping the replicas check",
+                        ledgerInRange);
+                mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
+                return;
+            }
+
+            int writeQuorumSize = metadata.getWriteQuorumSize();
+            int ackQuorumSize = metadata.getAckQuorumSize();
+            int ensembleSize = metadata.getEnsembleSize();
+            RoundRobinDistributionSchedule distributionSchedule = new RoundRobinDistributionSchedule(writeQuorumSize,
+                    ackQuorumSize, ensembleSize);
+            List<Entry<Long, ? extends List<BookieSocketAddress>>> segments = new LinkedList<>(
+                    metadata.getAllEnsembles().entrySet());
+            /*
+             * since there are multiple segments, MultiCallback should be
+             * created for (ensembleSize * segments.size()) calls.
+             */
+            MultiCallback mcbForThisLedger = new MultiCallback(ensembleSize * segments.size(), mcbForThisLedgerRange,
+                    null, BKException.Code.OK, BKException.Code.ReadException);
+            HashMap<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoMap =
+                    new HashMap<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>>();
+            for (int segmentNum = 0; segmentNum < segments.size(); segmentNum++) {
+                final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble = segments.get(segmentNum);
+                final List<BookieSocketAddress> ensembleOfSegment = segmentEnsemble.getValue();
+                final long startEntryIdOfSegment = segmentEnsemble.getKey();
+                final long lastEntryIdOfSegment = (segmentNum == (segments.size() - 1)) ? metadata.getLastEntryId()
+                        : segments.get(segmentNum + 1).getKey() - 1;
+                for (int bookieIndex = 0; bookieIndex < ensembleOfSegment.size(); bookieIndex++) {
+                    final BookieSocketAddress bookieInEnsemble = ensembleOfSegment.get(bookieIndex);
+                    final BitSet entriesStripedToThisBookie = distributionSchedule
+                            .getEntriesStripedToTheBookie(bookieIndex, startEntryIdOfSegment, lastEntryIdOfSegment);
+                    if (entriesStripedToThisBookie.cardinality() == 0) {
+                        /*
+                         * if no entry is expected to contain in this bookie,
+                         * then there is no point in making
+                         * getListOfEntriesOfLedger call for this bookie. So
+                         * instead callback with success result.
+                         */
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(
+                                    "For ledger: {}, in Segment: {}, no entry is expected to contain in"
+                                            + " this bookie: {}. So skipping getListOfEntriesOfLedger call",
+                                    ledgerInRange, segmentEnsemble, bookieInEnsemble);
+                        }
+                        mcbForThisLedger.processResult(BKException.Code.OK, null, null);
+                        continue;
+                    }
+                    List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoMap
+                            .get(bookieInEnsemble);
+                    if (bookieSegmentInfoList == null) {
+                        bookieSegmentInfoList = new ArrayList<BookieExpectedToContainSegmentInfo>();
+                        bookiesSegmentInfoMap.put(bookieInEnsemble, bookieSegmentInfoList);
+                    }
+                    bookieSegmentInfoList.add(new BookieExpectedToContainSegmentInfo(startEntryIdOfSegment,
+                            lastEntryIdOfSegment, segmentEnsemble, entriesStripedToThisBookie));
+                }
+            }
+            for (Entry<BookieSocketAddress, List<BookieExpectedToContainSegmentInfo>> bookiesSegmentInfoTuple :
+                bookiesSegmentInfoMap.entrySet()) {
+                final BookieSocketAddress bookieInEnsemble = bookiesSegmentInfoTuple.getKey();
+                final List<BookieExpectedToContainSegmentInfo> bookieSegmentInfoList = bookiesSegmentInfoTuple
+                        .getValue();
+                admin.asyncGetListOfEntriesOfLedger(bookieInEnsemble, ledgerInRange)
+                        .whenComplete(new GetListOfEntriesOfLedgerCallbackForReplicasCheck(ledgerInRange, ensembleSize,
+                                writeQuorumSize, ackQuorumSize, bookieInEnsemble, bookieSegmentInfoList,
+                                ledgersWithMissingEntries, ledgersWithUnavailableBookies, mcbForThisLedger));
+            }
+        }
+    }
+
+    private static class BookieExpectedToContainSegmentInfo {
+        private final long startEntryIdOfSegment;
+        private final long lastEntryIdOfSegment;
+        private final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble;
+        private final BitSet entriesOfSegmentStripedToThisBookie;
+
+        private BookieExpectedToContainSegmentInfo(long startEntryIdOfSegment, long lastEntryIdOfSegment,
+                Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble,
+                BitSet entriesOfSegmentStripedToThisBookie) {
+            this.startEntryIdOfSegment = startEntryIdOfSegment;
+            this.lastEntryIdOfSegment = lastEntryIdOfSegment;
+            this.segmentEnsemble = segmentEnsemble;
+            this.entriesOfSegmentStripedToThisBookie = entriesOfSegmentStripedToThisBookie;
+        }
+
+        public long getStartEntryIdOfSegment() {
+            return startEntryIdOfSegment;
+        }
+
+        public long getLastEntryIdOfSegment() {
+            return lastEntryIdOfSegment;
+        }
+
+        public Entry<Long, ? extends List<BookieSocketAddress>> getSegmentEnsemble() {
+            return segmentEnsemble;
+        }
+
+        public BitSet getEntriesOfSegmentStripedToThisBookie() {
+            return entriesOfSegmentStripedToThisBookie;
+        }
+    }
+
+    private static class GetListOfEntriesOfLedgerCallbackForReplicasCheck
+            implements BiConsumer<AvailabilityOfEntriesOfLedger, Throwable> {
+        private final long ledgerInRange;
+        private final int ensembleSize;
+        private final int writeQuorumSize;
+        private final int ackQuorumSize;
+        private final BookieSocketAddress bookieInEnsemble;
+        private final List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList;
+        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries;
+        private final ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies;
+        private final MultiCallback mcbForThisLedger;
+
+        private GetListOfEntriesOfLedgerCallbackForReplicasCheck(long ledgerInRange, int ensembleSize,
+                int writeQuorumSize, int ackQuorumSize, BookieSocketAddress bookieInEnsemble,
+                List<BookieExpectedToContainSegmentInfo> bookieExpectedToContainSegmentInfoList,
+                ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries,
+                ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies,
+                MultiCallback mcbForThisLedger) {
+            this.ledgerInRange = ledgerInRange;
+            this.ensembleSize = ensembleSize;
+            this.writeQuorumSize = writeQuorumSize;
+            this.ackQuorumSize = ackQuorumSize;
+            this.bookieInEnsemble = bookieInEnsemble;
+            this.bookieExpectedToContainSegmentInfoList = bookieExpectedToContainSegmentInfoList;
+            this.ledgersWithMissingEntries = ledgersWithMissingEntries;
+            this.ledgersWithUnavailableBookies = ledgersWithUnavailableBookies;
+            this.mcbForThisLedger = mcbForThisLedger;
+        }
+
+        @Override
+        public void accept(AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger,
+                Throwable listOfEntriesException) {
+
+            if (listOfEntriesException != null) {
+                if (BKException
+                        .getExceptionCode(listOfEntriesException) == BKException.Code.NoSuchLedgerExistsException) {
+                    LOG.debug("Got NoSuchLedgerExistsException for ledger: {} from bookie: {}", ledgerInRange,
+                            bookieInEnsemble);
+                    /*
+                     * in the case of NoSuchLedgerExistsException, it should be
+                     * considered as empty AvailabilityOfEntriesOfLedger.
+                     */
+                    availabilityOfEntriesOfLedger = AvailabilityOfEntriesOfLedger.EMPTY_AVAILABILITYOFENTRIESOFLEDGER;
+                } else {
+                    LOG.warn("Unable to GetListOfEntriesOfLedger for ledger: {} from: {}", ledgerInRange,
+                            bookieInEnsemble, listOfEntriesException);
+                    MissingEntriesInfoOfLedger unavailableBookiesInfoOfThisLedger = ledgersWithUnavailableBookies
+                            .get(ledgerInRange);
+                    if (unavailableBookiesInfoOfThisLedger == null) {
+                        ledgersWithUnavailableBookies.putIfAbsent(ledgerInRange,
+                                new MissingEntriesInfoOfLedger(ledgerInRange, ensembleSize, writeQuorumSize,
+                                        ackQuorumSize,
+                                        Collections.synchronizedList(new ArrayList<MissingEntriesInfo>())));
+                        unavailableBookiesInfoOfThisLedger = ledgersWithUnavailableBookies.get(ledgerInRange);
+                    }
+                    List<MissingEntriesInfo> missingEntriesInfoList =
+                            unavailableBookiesInfoOfThisLedger.getMissingEntriesInfoList();
+                    for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo
+                            : bookieExpectedToContainSegmentInfoList) {
+                        missingEntriesInfoList.add(new MissingEntriesInfo(ledgerInRange,
+                                bookieExpectedToContainSegmentInfo.getSegmentEnsemble(), bookieInEnsemble, null));
+                        /*
+                         * though GetListOfEntriesOfLedger has failed with
+                         * exception, mcbForThisLedger should be called back
+                         * with OK response, because we dont consider this as
+                         * fatal error in replicasCheck and dont want
+                         * replicasCheck to exit just because of this issue. So
+                         * instead maintain the state of
+                         * ledgersWithUnavailableBookies, so that replicascheck
+                         * will report these ledgers/bookies appropriately.
+                         */
+                        mcbForThisLedger.processResult(BKException.Code.OK, null, null);
+                    }
+                    return;
+                }
+            }
+
+            for (BookieExpectedToContainSegmentInfo bookieExpectedToContainSegmentInfo
+                    : bookieExpectedToContainSegmentInfoList) {
+                final long startEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getStartEntryIdOfSegment();
+                final long lastEntryIdOfSegment = bookieExpectedToContainSegmentInfo.getLastEntryIdOfSegment();
+                final BitSet entriesStripedToThisBookie = bookieExpectedToContainSegmentInfo
+                        .getEntriesOfSegmentStripedToThisBookie();
+                final Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
+                        bookieExpectedToContainSegmentInfo.getSegmentEnsemble();
+                final List<Long> unavailableEntriesList = availabilityOfEntriesOfLedger
+                        .getUnavailableEntries(startEntryIdOfSegment, lastEntryIdOfSegment, entriesStripedToThisBookie);
+                if ((unavailableEntriesList != null) && (!unavailableEntriesList.isEmpty())) {
+                    MissingEntriesInfoOfLedger missingEntriesInfoOfThisLedger = ledgersWithMissingEntries
+                            .get(ledgerInRange);
+                    if (missingEntriesInfoOfThisLedger == null) {
+                        ledgersWithMissingEntries.putIfAbsent(ledgerInRange,
+                                new MissingEntriesInfoOfLedger(ledgerInRange, ensembleSize, writeQuorumSize,
+                                        ackQuorumSize,
+                                        Collections.synchronizedList(new ArrayList<MissingEntriesInfo>())));
+                        missingEntriesInfoOfThisLedger = ledgersWithMissingEntries.get(ledgerInRange);
+                    }
+                    missingEntriesInfoOfThisLedger.getMissingEntriesInfoList().add(new MissingEntriesInfo(ledgerInRange,
+                            segmentEnsemble, bookieInEnsemble, unavailableEntriesList));
+                }
+                /*
+                 * here though unavailableEntriesList is not empty,
+                 * mcbForThisLedger should be called back with OK response,
+                 * because we dont consider this as fatal error in replicasCheck
+                 * and dont want replicasCheck to exit just because of this
+                 * issue. So instead maintain the state of
+                 * missingEntriesInfoOfThisLedger, so that replicascheck will
+                 * report these ledgers/bookies/missingentries appropriately.
+                 */
+                mcbForThisLedger.processResult(BKException.Code.OK, null, null);
+            }
+        }
+    }
+
+    private static class ReplicasCheckFinalCallback implements AsyncCallback.VoidCallback {
+        final AtomicInteger resultCode;
+        final CountDownLatch replicasCheckLatch;
+
+        private ReplicasCheckFinalCallback(AtomicInteger resultCode, CountDownLatch replicasCheckLatch) {
+            this.resultCode = resultCode;
+            this.replicasCheckLatch = replicasCheckLatch;
+        }
+
+        @Override
+        public void processResult(int rc, String s, Object obj) {
+            resultCode.set(rc);
+            replicasCheckLatch.countDown();
+        }
+    }
+
+    void replicasCheck() throws BKAuditException {
+        ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries =
+                    new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
+        ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies =
+                    new ConcurrentHashMap<Long, MissingEntriesInfoOfLedger>();
+        LedgerRangeIterator ledgerRangeIterator = ledgerManager.getLedgerRanges(zkOpTimeoutMs);
+        final Semaphore maxConcurrentSemaphore = new Semaphore(MAX_CONCURRENT_REPLICAS_CHECK_LEDGER_REQUESTS);
+        while (true) {
+            LedgerRange ledgerRange = null;
+            try {
+                if (ledgerRangeIterator.hasNext()) {
+                    ledgerRange = ledgerRangeIterator.next();
+                } else {
+                    break;
+                }
+            } catch (IOException ioe) {
+                LOG.error("Got IOException while iterating LedgerRangeIterator", ioe);
+                throw new BKAuditException("Got IOException while iterating LedgerRangeIterator", ioe);
+            }
+            ledgersWithMissingEntries.clear();
+            ledgersWithUnavailableBookies.clear();
+            numLedgersFoundHavingNoReplicaOfAnEntry.set(0);
+            numLedgersFoundHavingLessThanAQReplicasOfAnEntry.set(0);
+            numLedgersFoundHavingLessThanWQReplicasOfAnEntry.set(0);
+            Set<Long> ledgersInRange = ledgerRange.getLedgers();
+            int numOfLedgersInRange = ledgersInRange.size();
+            // Final result after processing all the ledgers
+            final AtomicInteger resultCode = new AtomicInteger();
+            final CountDownLatch replicasCheckLatch = new CountDownLatch(1);
+
+            ReplicasCheckFinalCallback finalCB = new ReplicasCheckFinalCallback(resultCode, replicasCheckLatch);
+            MultiCallback mcbForThisLedgerRange = new MultiCallback(numOfLedgersInRange, finalCB, null,
+                    BKException.Code.OK, BKException.Code.ReadException) {
+                @Override
+                public void processResult(int rc, String path, Object ctx) {
+                    try {
+                        super.processResult(rc, path, ctx);
+                    } finally {
+                        maxConcurrentSemaphore.release();
+                    }
+                }
+            };
+            LOG.debug("Number of ledgers in the current LedgerRange : {}", numOfLedgersInRange);
+            for (Long ledgerInRange : ledgersInRange) {
+                try {
+                    if (!maxConcurrentSemaphore.tryAcquire(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
+                        LOG.error("Timedout ({} secs) while waiting for acquiring semaphore",
+                                REPLICAS_CHECK_TIMEOUT_IN_SECS);
+                        throw new BKAuditException("Timedout while waiting for acquiring semaphore");
+                    }
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Got InterruptedException while acquiring semaphore for replicascheck", ie);
+                    throw new BKAuditException("Got InterruptedException while acquiring semaphore for replicascheck",
+                            ie);
+                }
+                if (checkUnderReplicationForReplicasCheck(ledgerInRange, mcbForThisLedgerRange)) {
+                    /*
+                     * if ledger is marked underreplicated, then ignore this
+                     * ledger for replicascheck.
+                     */
+                    continue;
+                }
+                ledgerManager.readLedgerMetadata(ledgerInRange)
+                        .whenComplete(new ReadLedgerMetadataCallbackForReplicasCheck(ledgerInRange,
+                                mcbForThisLedgerRange, ledgersWithMissingEntries, ledgersWithUnavailableBookies));
+            }
+            try {
+                /*
+                 * if mcbForThisLedgerRange is not calledback within
+                 * REPLICAS_CHECK_TIMEOUT_IN_SECS secs then better give up
+                 * doing replicascheck, since there could be an issue and
+                 * blocking the single threaded auditor executor thread is not
+                 * expected.
+                 */
+                if (!replicasCheckLatch.await(REPLICAS_CHECK_TIMEOUT_IN_SECS, TimeUnit.SECONDS)) {
+                    LOG.error(
+                            "For LedgerRange with num of ledgers : {} it didn't complete replicascheck"
+                                    + " in {} secs, so giving up",
+                            numOfLedgersInRange, REPLICAS_CHECK_TIMEOUT_IN_SECS);
+                    throw new BKAuditException("Got InterruptedException while doing replicascheck");
+                }
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                LOG.error("Got InterruptedException while doing replicascheck", ie);
+                throw new BKAuditException("Got InterruptedException while doing replicascheck", ie);
+            }
+            reportLedgersWithMissingEntries(ledgersWithMissingEntries);
+            reportLedgersWithUnavailableBookies(ledgersWithUnavailableBookies);
+            int resultCodeIntValue = resultCode.get();
+            if (resultCodeIntValue != BKException.Code.OK) {
+                throw new BKAuditException("Exception while doing replicas check",
+                        BKException.create(resultCodeIntValue));
+            }
+        }
+        try {
+            ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
+        } catch (UnavailableException ue) {
+            LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
+        }
+    }
+
+    private void reportLedgersWithMissingEntries(
+            ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithMissingEntries) {
+        StringBuilder errMessage = new StringBuilder();
+        HashMultiset<Long> missingEntries = HashMultiset.create();
+        int writeQuorumSize;
+        int ackQuorumSize;
+        for (Map.Entry<Long, MissingEntriesInfoOfLedger> missingEntriesInfoOfLedgerEntry : ledgersWithMissingEntries
+                .entrySet()) {
+            missingEntries.clear();
+            errMessage.setLength(0);
+            long ledgerWithMissingEntries = missingEntriesInfoOfLedgerEntry.getKey();
+            MissingEntriesInfoOfLedger missingEntriesInfoOfLedger = missingEntriesInfoOfLedgerEntry.getValue();
+            List<MissingEntriesInfo> missingEntriesInfoList = missingEntriesInfoOfLedger.getMissingEntriesInfoList();
+            writeQuorumSize = missingEntriesInfoOfLedger.getWriteQuorumSize();
+            ackQuorumSize = missingEntriesInfoOfLedger.getAckQuorumSize();
+            errMessage.append("Ledger : " + ledgerWithMissingEntries + " has following missing entries : ");
+            for (int listInd = 0; listInd < missingEntriesInfoList.size(); listInd++) {
+                MissingEntriesInfo missingEntriesInfo = missingEntriesInfoList.get(listInd);
+                List<Long> unavailableEntriesList = missingEntriesInfo.getUnavailableEntriesList();
+                Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
+                        missingEntriesInfo.getSegmentEnsemble();
+                missingEntries.addAll(unavailableEntriesList);
+                errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble "
+                        + segmentEnsemble.getValue() + ", following entries " + unavailableEntriesList
+                        + " are missing in bookie: " + missingEntriesInfo.getBookieMissingEntries());
+                if (listInd < (missingEntriesInfoList.size() - 1)) {
+                    errMessage.append(", ");
+                }
+            }
+            LOG.error(errMessage.toString());
+            Set<Multiset.Entry<Long>> missingEntriesSet = missingEntries.entrySet();
+            int maxNumOfMissingReplicas = 0;
+            long entryWithMaxNumOfMissingReplicas = -1L;
+            for (Multiset.Entry<Long> missingEntryWithCount : missingEntriesSet) {
+                if (missingEntryWithCount.getCount() > maxNumOfMissingReplicas) {
+                    maxNumOfMissingReplicas = missingEntryWithCount.getCount();
+                    entryWithMaxNumOfMissingReplicas = missingEntryWithCount.getElement();
+                }
+            }
+            int leastNumOfReplicasOfAnEntry = writeQuorumSize - maxNumOfMissingReplicas;
+            if (leastNumOfReplicasOfAnEntry == 0) {
+                numLedgersFoundHavingNoReplicaOfAnEntry.incrementAndGet();
+                LOG.error("Ledger : {} entryId : {} is missing all replicas", ledgerWithMissingEntries,
+                        entryWithMaxNumOfMissingReplicas);
+            } else if (leastNumOfReplicasOfAnEntry < ackQuorumSize) {
+                numLedgersFoundHavingLessThanAQReplicasOfAnEntry.incrementAndGet();
+                LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than ackQuorum num of replicas : {}",
+                        ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry,
+                        ackQuorumSize);
+            } else if (leastNumOfReplicasOfAnEntry < writeQuorumSize) {
+                numLedgersFoundHavingLessThanWQReplicasOfAnEntry.incrementAndGet();
+                LOG.error("Ledger : {} entryId : {} is having: {} replicas, less than writeQuorum num of replicas : {}",
+                        ledgerWithMissingEntries, entryWithMaxNumOfMissingReplicas, leastNumOfReplicasOfAnEntry,
+                        writeQuorumSize);
+            }
+        }
+    }
+
+    private void reportLedgersWithUnavailableBookies(
+            ConcurrentHashMap<Long, MissingEntriesInfoOfLedger> ledgersWithUnavailableBookies) {
+        StringBuilder errMessage = new StringBuilder();
+        for (Map.Entry<Long, MissingEntriesInfoOfLedger> ledgerWithUnavailableBookiesInfo :
+                ledgersWithUnavailableBookies.entrySet()) {
+            errMessage.setLength(0);
+            long ledgerWithUnavailableBookies = ledgerWithUnavailableBookiesInfo.getKey();
+            List<MissingEntriesInfo> missingBookiesInfoList = ledgerWithUnavailableBookiesInfo.getValue()
+                    .getMissingEntriesInfoList();
+            errMessage.append("Ledger : " + ledgerWithUnavailableBookies + " has following unavailable bookies : ");
+            for (int listInd = 0; listInd < missingBookiesInfoList.size(); listInd++) {
+                MissingEntriesInfo missingBookieInfo = missingBookiesInfoList.get(listInd);
+                Entry<Long, ? extends List<BookieSocketAddress>> segmentEnsemble =
+                        missingBookieInfo.getSegmentEnsemble();
+                errMessage.append("In segment starting at " + segmentEnsemble.getKey() + " with ensemble "
+                        + segmentEnsemble.getValue() + ", following bookie has not responded "
+                        + missingBookieInfo.getBookieMissingEntries());
+                if (listInd < (missingBookiesInfoList.size() - 1)) {
+                    errMessage.append(", ");
+                }
+            }
+            LOG.error(errMessage.toString());
+        }
+    }
+
+    boolean checkUnderReplicationForReplicasCheck(long ledgerInRange, VoidCallback mcbForThisLedgerRange) {
+        try {
+            if (ledgerUnderreplicationManager.getLedgerUnreplicationInfo(ledgerInRange) == null) {
+                return false;
+            }
+            /*
+             * this ledger is marked underreplicated, so ignore it for
+             * replicasCheck.
+             */
+            LOG.debug("Ledger: {} is marked underrreplicated, ignore this ledger for replicasCheck",
+                    ledgerInRange);
+            mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
+            return true;
+        } catch (UnavailableException une) {
+            LOG.error("Got exception while trying to check if ledger: {} is underreplicated", ledgerInRange, une);
+            mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null);
+            return true;
+        }
+    }
+
     /**
      * Shutdown the auditor.
      */
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index 2f83804..f553f91 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -34,6 +34,7 @@ public interface ReplicationStats {
     String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
     String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
     String PLACEMENT_POLICY_CHECK_TIME = "PLACEMENT_POLICY_CHECK_TIME";
+    String REPLICAS_CHECK_TIME = "REPLICAS_CHECK_TIME";
     String AUDIT_BOOKIES_TIME = "AUDIT_BOOKIES_TIME";
     String NUM_FRAGMENTS_PER_LEDGER = "NUM_FRAGMENTS_PER_LEDGER";
     String NUM_BOOKIES_PER_LEDGER = "NUM_BOOKIES_PER_LEDGER";
@@ -42,6 +43,13 @@ public interface ReplicationStats {
     String NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED = "NUM_DELAYED_BOOKIE_AUDITS_CANCELLED";
     String NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY = "NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY";
     String NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY = "NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY";
+    String NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD =
+            "NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD";
+    String NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY = "NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY";
+    String NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY =
+            "NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY";
+    String NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY =
+            "NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY";
 
     String REPLICATION_WORKER_SCOPE = "replication_worker";
     String REREPLICATE_OP = "rereplicate";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
index 6042bdb..8a3047a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedger.java
@@ -18,9 +18,11 @@
 package org.apache.bookkeeper.util;
 
 import io.netty.buffer.ByteBuf;
-
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
 import java.util.Map.Entry;
 import java.util.PrimitiveIterator;
 import java.util.TreeMap;
@@ -44,6 +46,11 @@ import org.apache.commons.lang3.mutable.MutableObject;
  */
 public class AvailabilityOfEntriesOfLedger {
     public static final long INVALID_ENTRYID = -1;
+    public static final AvailabilityOfEntriesOfLedger EMPTY_AVAILABILITYOFENTRIESOFLEDGER;
+    static {
+        long tmpArray[] = {};
+        EMPTY_AVAILABILITYOFENTRIESOFLEDGER = new AvailabilityOfEntriesOfLedger(Arrays.stream(tmpArray).iterator());
+    }
 
     /*
      *
@@ -109,6 +116,10 @@ public class AvailabilityOfEntriesOfLedger {
             return lastSequenceStart;
         }
 
+        private long getLastEntryInSequenceGroup() {
+            return lastSequenceStart + sequenceSize;
+        }
+
         private void setLastSequenceStart(long lastSequenceStart) {
             this.lastSequenceStart = lastSequenceStart;
         }
@@ -367,6 +378,50 @@ public class AvailabilityOfEntriesOfLedger {
         return seqGroup.getValue().isEntryAvailable(entryId);
     }
 
+    public List<Long> getUnavailableEntries(long startEntryId, long lastEntryId, BitSet availabilityOfEntries) {
+        if (!isAvailabilityOfEntriesOfLedgerClosed()) {
+            throw new IllegalStateException(
+                    "AvailabilityOfEntriesOfLedger is not yet closed, it is illegal to call getUnavailableEntries");
+        }
+        List<Long> unavailableEntries = new ArrayList<Long>();
+        SequenceGroup curSeqGroup = null;
+        boolean noSeqGroupRemaining = false;
+        int bitSetIndex = 0;
+        for (long entryId = startEntryId; entryId <= lastEntryId; entryId++, bitSetIndex++) {
+            if (noSeqGroupRemaining) {
+                if (availabilityOfEntries.get(bitSetIndex)) {
+                    unavailableEntries.add(entryId);
+                }
+                continue;
+            }
+            if ((curSeqGroup == null) || (entryId > curSeqGroup.getLastEntryInSequenceGroup())) {
+                Entry<Long, SequenceGroup> curSeqGroupEntry = sortedSequenceGroups.floorEntry(entryId);
+                if (curSeqGroupEntry == null) {
+                    if (availabilityOfEntries.get(bitSetIndex)) {
+                        unavailableEntries.add(entryId);
+                    }
+                    if (sortedSequenceGroups.ceilingEntry(entryId) == null) {
+                        noSeqGroupRemaining = true;
+                    }
+                    continue;
+                } else {
+                    curSeqGroup = curSeqGroupEntry.getValue();
+                    if (entryId > curSeqGroup.getLastEntryInSequenceGroup()) {
+                        if (availabilityOfEntries.get(bitSetIndex)) {
+                            unavailableEntries.add(entryId);
+                        }
+                        noSeqGroupRemaining = true;
+                        continue;
+                    }
+                }
+            }
+            if (availabilityOfEntries.get(bitSetIndex) && (!curSeqGroup.isEntryAvailable(entryId))) {
+                unavailableEntries.add(entryId);
+            }
+        }
+        return unavailableEntries;
+    }
+
     public long getTotalNumOfAvailableEntries() {
         if (!isAvailabilityOfEntriesOfLedgerClosed()) {
             throw new IllegalStateException("AvailabilityOfEntriesOfLedger is not yet closed,"
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 89b0d0c..5fbf5ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -44,6 +44,7 @@ public class BookKeeperConstants {
     public static final String LOSTBOOKIERECOVERYDELAY_NODE = "lostBookieRecoveryDelay";
     public static final String CHECK_ALL_LEDGERS_CTIME = "checkallledgersctime";
     public static final String PLACEMENT_POLICY_CHECK_CTIME = "placementpolicycheckctime";
+    public static final String REPLICAS_CHECK_CTIME = "replicascheckctime";
     public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
     public static final String LAYOUT_ZNODE = "LAYOUT";
     public static final String INSTANCEID = "INSTANCEID";
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 03f182d..3d409d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -36,7 +36,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -535,4 +538,41 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testGetListOfEntriesOfLedgerWithEntriesNotStripedToABookie() throws Exception {
+        ClientConfiguration conf = new ClientConfiguration();
+        conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        BookKeeper bkc = new BookKeeper(conf);
+        /*
+         * in this testsuite there are going to be 2 (numOfBookies) bookies and
+         * we are having ensemble of size 2.
+         */
+        LedgerHandle lh = bkc.createLedger(2, 1, digestType, "testPasswd".getBytes());
+        long lId = lh.getId();
+        /*
+         * ledger is writeclosed without adding any entry.
+         */
+        lh.close();
+        CountDownLatch callbackCalled = new CountDownLatch(1);
+        AtomicBoolean exceptionInCallback = new AtomicBoolean(false);
+        AtomicInteger exceptionCode = new AtomicInteger(BKException.Code.OK);
+        BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
+        /*
+         * since no entry is added, callback is supposed to fail with
+         * NoSuchLedgerExistsException.
+         */
+        bkAdmin.asyncGetListOfEntriesOfLedger(bs.get(0).getLocalAddress(), lId)
+                .whenComplete((availabilityOfEntriesOfLedger, throwable) -> {
+                    exceptionInCallback.set(throwable != null);
+                    if (throwable != null) {
+                        exceptionCode.set(BKException.getExceptionCode(throwable));
+                    }
+                    callbackCalled.countDown();
+                });
+        callbackCalled.await();
+        assertTrue("Exception occurred", exceptionInCallback.get());
+        assertEquals("Exception code", BKException.Code.NoSuchLedgerExistsException, exceptionCode.get());
+        bkAdmin.close();
+        bkc.close();
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
index b78897a..4498eea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/RoundRobinDistributionScheduleTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
 
+import java.util.BitSet;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -171,4 +172,49 @@ public class RoundRobinDistributionScheduleTest {
         w.moveAndShift(4, 4);
         assertEquals(w, writeSetFromValues(1, 2, 3, 4, 5));
     }
+
+    @Test
+    public void testGetEntriesStripedToTheBookie() {
+
+        RoundRobinDistributionSchedule schedule;
+        BitSet entriesStriped;
+
+        int ensSize = 3;
+        int writeQuorum = 3;
+        int ackQuorum = 3;
+        int startEntryId = 3;
+        int lastEntryId = 5;
+        schedule = new RoundRobinDistributionSchedule(writeQuorum, ackQuorum, ensSize);
+
+        for (int bookieIndex = 0; bookieIndex < ensSize; bookieIndex++) {
+            entriesStriped = schedule.getEntriesStripedToTheBookie(bookieIndex, startEntryId, lastEntryId);
+            assertEquals("Cardinality", 3, entriesStriped.cardinality());
+            for (int i = 0; i < entriesStriped.length(); i++) {
+                assertEquals("EntryAvailability", schedule.hasEntry((startEntryId + i), bookieIndex),
+                        entriesStriped.get(i));
+            }
+        }
+
+        ensSize = 5;
+        writeQuorum = 3;
+        ackQuorum = 2;
+        startEntryId = 100;
+        lastEntryId = 122;
+        schedule = new RoundRobinDistributionSchedule(writeQuorum, ackQuorum, ensSize);
+        for (int bookieIndex = 0; bookieIndex < ensSize; bookieIndex++) {
+            entriesStriped = schedule.getEntriesStripedToTheBookie(bookieIndex, startEntryId, lastEntryId);
+            for (int i = 0; i < entriesStriped.length(); i++) {
+                assertEquals("EntryAvailability", schedule.hasEntry((startEntryId + i), bookieIndex),
+                        entriesStriped.get(i));
+            }
+        }
+
+        schedule = new RoundRobinDistributionSchedule(2, 2, 3);
+        entriesStriped = schedule.getEntriesStripedToTheBookie(2, 0, 0);
+        assertEquals("Cardinality", 0, entriesStriped.cardinality());
+        entriesStriped = schedule.getEntriesStripedToTheBookie(2, 3, 3);
+        assertEquals("Cardinality", 0, entriesStriped.cardinality());
+        entriesStriped = schedule.getEntriesStripedToTheBookie(2, 4, 4);
+        assertEquals("Cardinality", 1, entriesStriped.cardinality());
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index f25aa88..bf96a5e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -58,11 +58,11 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -809,7 +809,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
             throws Exception {
         for (int i = 0; i < secondsToWait; i++) {
             try {
-                UnderreplicatedLedgerFormat data = urLedgerMgr.getLedgerUnreplicationInfo(ledgerId);
+                UnderreplicatedLedger data = urLedgerMgr.getLedgerUnreplicationInfo(ledgerId);
                 boolean all = true;
                 for (String r : replicas) {
                     all = all && data.getReplicaList().contains(r);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index dc343eb..3999c3c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -550,6 +550,110 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         auditor.close();
     }
 
+    @Test
+    public void testInitialDelayOfReplicasCheck() throws Exception {
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+
+        LedgerHandle lh = bkc.createLedger(3, 2, DigestType.CRC32, "passwd".getBytes());
+        for (int j = 0; j < 5; j++) {
+            lh.addEntry("testdata".getBytes());
+        }
+        lh.close();
+
+        long ledgerId = 100000L;
+        lh = bkc.createLedgerAdv(ledgerId, 3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null);
+        lh.close();
+
+        ledgerId = 100001234L;
+        lh = bkc.createLedgerAdv(ledgerId, 3, 3, 2, DigestType.CRC32, "passwd".getBytes(), null);
+        for (int j = 0; j < 4; j++) {
+            lh.addEntry(j, "testdata".getBytes());
+        }
+        lh.close();
+
+        ledgerId = 991234L;
+        lh = bkc.createLedgerAdv(ledgerId, 3, 2, 2, DigestType.CRC32, "passwd".getBytes(), null);
+        lh.addEntry(0, "testdata".getBytes());
+        lh.close();
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        validateInitialDelayOfReplicasCheck(urm, -1, 1000, servConf, bkc);
+        validateInitialDelayOfReplicasCheck(urm, 999, 1000, servConf, bkc);
+        validateInitialDelayOfReplicasCheck(urm, 1001, 1000, servConf, bkc);
+    }
+
+    void validateInitialDelayOfReplicasCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs,
+            long auditorPeriodicReplicasCheckInterval, ServerConfiguration servConf, BookKeeper bkc)
+            throws UnavailableException, UnknownHostException, InterruptedException {
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        TestOpStatsLogger replicasCheckStatsLogger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
+        servConf.setAuditorPeriodicReplicasCheckInterval(auditorPeriodicReplicasCheckInterval);
+        servConf.setAuditorPeriodicCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(0);
+        final TestAuditor auditor = new TestAuditor(Bookie.getBookieAddress(servConf).toString(), servConf, bkc, false,
+                statsLogger);
+        CountDownLatch latch = auditor.getLatch();
+        assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount());
+        long curTimeBeforeStart = System.currentTimeMillis();
+        long replicasCheckCTime = -1;
+        long initialDelayInMsecs = -1;
+        long nextExpectedReplicasCheckExecutionTime = -1;
+        long bufferTimeInMsecs = 20000L;
+        if (timeSinceLastExecutedInSecs == -1) {
+            /*
+             * if we are setting replicasCheckCTime to -1, it means that
+             * replicasCheck hasn't run before. So initialDelay for
+             * replicasCheck should be 0.
+             */
+            replicasCheckCTime = -1;
+            initialDelayInMsecs = 0;
+        } else {
+            replicasCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L;
+            initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicReplicasCheckInterval ? 0
+                    : (auditorPeriodicReplicasCheckInterval - timeSinceLastExecutedInSecs) * 1000L;
+        }
+        /*
+         * next replicasCheck should happen atleast after
+         * nextExpectedReplicasCheckExecutionTime.
+         */
+        nextExpectedReplicasCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs;
+
+        urm.setReplicasCheckCTime(replicasCheckCTime);
+        auditor.start();
+        /*
+         * since auditorPeriodicReplicasCheckInterval are higher values (in the
+         * order of 100s of seconds), its ok bufferTimeInMsecs to be ` 20 secs.
+         */
+        assertTrue("replicasCheck should have executed with initialDelay " + initialDelayInMsecs,
+                latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS));
+        for (int i = 0; i < 20; i++) {
+            Thread.sleep(100);
+            if (replicasCheckStatsLogger.getSuccessCount() >= 1) {
+                break;
+            }
+        }
+        assertEquals("REPLICAS_CHECK_TIME SuccessCount", 1, replicasCheckStatsLogger.getSuccessCount());
+        long currentReplicasCheckCTime = urm.getReplicasCheckCTime();
+        assertTrue(
+                "currentReplicasCheckCTime: " + currentReplicasCheckCTime
+                        + " should be greater than nextExpectedReplicasCheckExecutionTime: "
+                        + nextExpectedReplicasCheckExecutionTime,
+                currentReplicasCheckCTime > nextExpectedReplicasCheckExecutionTime);
+        assertTrue(
+                "currentReplicasCheckCTime: " + currentReplicasCheckCTime
+                        + " should be lesser than nextExpectedReplicasCheckExecutionTime+bufferTimeInMsecs: "
+                        + (nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs),
+                currentReplicasCheckCTime < (nextExpectedReplicasCheckExecutionTime + bufferTimeInMsecs));
+        auditor.close();
+    }
+
     static class TestAuditor extends Auditor {
 
         final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
@@ -574,6 +678,11 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
             latchRef.get().countDown();
         }
 
+        void replicasCheck() throws BKAuditException {
+            super.replicasCheck();
+            latchRef.get().countDown();
+        }
+
         CountDownLatch getLatch() {
             return latchRef.get();
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 1d786f9..b8548e0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -289,6 +289,136 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testPlacementPolicyCheckForURLedgersElapsedRecoveryGracePeriod() throws Exception {
+        testPlacementPolicyCheckWithURLedgers(true);
+    }
+
+    @Test
+    public void testPlacementPolicyCheckForURLedgersNotElapsedRecoveryGracePeriod() throws Exception {
+        testPlacementPolicyCheckWithURLedgers(false);
+    }
+
+    public void testPlacementPolicyCheckWithURLedgers(boolean timeElapsed) throws Exception {
+        int numOfBookies = 4;
+        /*
+         * in timeElapsed=true scenario, set some low value, otherwise set some
+         * highValue.
+         */
+        int underreplicatedLedgerRecoveryGracePeriod = timeElapsed ? 1 : 1000;
+        int numOfURLedgersElapsedRecoveryGracePeriod = 0;
+        List<BookieSocketAddress> bookieAddresses = new ArrayList<BookieSocketAddress>();
+        RegistrationManager regManager = driver.getRegistrationManager();
+
+        for (int i = 0; i < numOfBookies; i++) {
+            BookieSocketAddress bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
+            bookieAddresses.add(bookieAddress);
+            regManager.registerBookie(bookieAddress.toString(), false);
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
+        int ensembleSize = 4;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+
+        long ledgerId1 = 1L;
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(ledgerId1, initMeta).get();
+        underreplicationManager.markLedgerUnderreplicated(ledgerId1, bookieAddresses.get(0).toString());
+        if (timeElapsed) {
+            numOfURLedgersElapsedRecoveryGracePeriod++;
+        }
+
+        /*
+         * this is non-closed ledger, it should also be reported as
+         * URLedgersElapsedRecoveryGracePeriod
+         */
+        ensembleSize = 3;
+        long ledgerId2 = 21234561L;
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L,
+                        Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2)))
+                .newEnsembleEntry(100L,
+                        Arrays.asList(bookieAddresses.get(3), bookieAddresses.get(1), bookieAddresses.get(2)))
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(ledgerId2, initMeta).get();
+        underreplicationManager.markLedgerUnderreplicated(ledgerId2, bookieAddresses.get(0).toString());
+        if (timeElapsed) {
+            numOfURLedgersElapsedRecoveryGracePeriod++;
+        }
+
+        /*
+         * this ledger is not marked underreplicated.
+         */
+        long ledgerId3 = 31234561L;
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L,
+                        Arrays.asList(bookieAddresses.get(1), bookieAddresses.get(2), bookieAddresses.get(3)))
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(ledgerId3, initMeta).get();
+
+        if (timeElapsed) {
+            /*
+             * in timeelapsed scenario, by waiting for
+             * underreplicatedLedgerRecoveryGracePeriod, recovery time must be
+             * elapsed.
+             */
+            Thread.sleep((underreplicatedLedgerRecoveryGracePeriod + 1) * 1000);
+        } else {
+            /*
+             * in timeElapsed=false scenario, since
+             * underreplicatedLedgerRecoveryGracePeriod is set to some high
+             * value, there is no value in waiting. So just wait for some time
+             * and make sure urledgers are not reported as recoverytime elapsed
+             * urledgers.
+             */
+            Thread.sleep(5000);
+        }
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        servConf.setUnderreplicatedLedgerRecoveryGracePeriod(underreplicatedLedgerRecoveryGracePeriod);
+        setServerConfigPropertiesForRackPlacement(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            Gauge<? extends Number> underreplicatedLedgersElapsedRecoveryGracePeriodGuage = statsLogger
+                    .getGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD);
+            assertEquals("NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD guage value",
+                    numOfURLedgersElapsedRecoveryGracePeriod,
+                    underreplicatedLedgersElapsedRecoveryGracePeriodGuage.getSample());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+    }
+
+    @Test
     public void testPlacementPolicyCheckWithLedgersNotAdheringToPolicyWithMultipleSegments() throws Exception {
         int numOfBookies = 7;
         int numOfLedgersNotAdheringToPlacementPolicy = 0;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index 2245d3f..3403d7f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -795,6 +795,21 @@ public class TestLedgerUnderreplicationManager {
         assertEquals(curTime, underReplicaMgr1.getPlacementPolicyCheckCTime());
     }
 
+    @Test
+    public void testReplicasCheckCTime() throws Exception {
+        @Cleanup
+        LedgerUnderreplicationManager underReplicaMgr1 = lmf1.newLedgerUnderreplicationManager();
+        @Cleanup
+        LedgerUnderreplicationManager underReplicaMgr2 = lmf2.newLedgerUnderreplicationManager();
+        assertEquals(-1, underReplicaMgr1.getReplicasCheckCTime());
+        long curTime = System.currentTimeMillis();
+        underReplicaMgr2.setReplicasCheckCTime(curTime);
+        assertEquals(curTime, underReplicaMgr1.getReplicasCheckCTime());
+        curTime = System.currentTimeMillis();
+        underReplicaMgr2.setReplicasCheckCTime(curTime);
+        assertEquals(curTime, underReplicaMgr1.getReplicasCheckCTime());
+    }
+
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
             throws KeeperException, InterruptedException, ReplicationException {
         Long ledgerA = 0xfeadeefdacL;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
index 4368f89..1f6e62a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/AvailabilityOfEntriesOfLedgerTest.java
@@ -24,8 +24,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.HashSet;
+import java.util.List;
 import java.util.PrimitiveIterator;
 import java.util.Set;
 
@@ -189,4 +192,111 @@ public class AvailabilityOfEntriesOfLedgerTest {
             }
         }
     }
+
+    @Test
+    public void testGetUnavailableEntries() {
+        /*
+         * AvailabilityOfEntriesOfLedger is going to be created with this
+         * entries. It is equivalent to considering that Bookie has these
+         * entries.
+         */
+        long[][] availableEntries = {
+                { 1, 2},
+                { 0, 1, 2 },
+                { 1, 2, 3, 5, 6, 7, 8 },
+                { 1, 5 },
+                { 3 },
+                { 1, 2, 4, 5, 7, 8 },
+                {},
+                { 1, 2, 3, 5, 6, 11, 12, 13, 14, 15, 16, 17, 100, 1000, 1001, 10000, 20000, 20001 }
+        };
+
+        /*
+         * getUnavailableEntries method is going to be called with these entries
+         * as expected to contain.
+         */
+        long[][] expectedToContainEntries = {
+                { 1, 2},
+                { 0, 1, 2, 3, 5 },
+                { 1, 2, 5, 7, 8 },
+                { 2, 7 },
+                { 3 },
+                { 1, 5, 7, 8, 9, 10 },
+                { 0, 1, 2, 3, 4, 5 },
+                { 4, 18, 1002, 19999, 20003 }
+        };
+
+        /*
+         * Considering what AvailabilityOfEntriesOfLedger contains
+         * (availableEntries), what it is expected to contain
+         * (expectedToContainEntries), following are the entries which are
+         * supposed to be reported as unavailable (unavailableEntries).
+         */
+        long[][] unavailableEntries = {
+                { },
+                { 3, 5 },
+                { },
+                { 2, 7 },
+                { },
+                { 9, 10 },
+                { 0, 1, 2, 3, 4, 5 },
+                { 4, 18, 1002, 19999, 20003 }
+        };
+
+        for (int i = 0; i < availableEntries.length; i++) {
+            long[] availableEntriesTempArray = availableEntries[i];
+            long[] expectedToContainEntriesTempArray = expectedToContainEntries[i];
+            long[] unavailableEntriesTempArray = unavailableEntries[i];
+            List<Long> unavailableEntriesTempList = new ArrayList<Long>();
+            for (int j = 0; j < unavailableEntriesTempArray.length; j++) {
+                unavailableEntriesTempList.add(unavailableEntriesTempArray[j]);
+            }
+
+            PrimitiveIterator.OfLong primitiveIterator = Arrays.stream(availableEntriesTempArray).iterator();
+            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(
+                    primitiveIterator);
+
+            long startEntryId;
+            long lastEntryId;
+            if (expectedToContainEntriesTempArray[0] == 0) {
+                startEntryId = expectedToContainEntriesTempArray[0];
+                lastEntryId = expectedToContainEntriesTempArray[expectedToContainEntriesTempArray.length - 1];
+            } else {
+                startEntryId = expectedToContainEntriesTempArray[0] - 1;
+                lastEntryId = expectedToContainEntriesTempArray[expectedToContainEntriesTempArray.length - 1] + 1;
+            }
+            BitSet expectedToContainEntriesBitSet = new BitSet((int) (lastEntryId - startEntryId + 1));
+            for (int ind = 0; ind < expectedToContainEntriesTempArray.length; ind++) {
+                int entryId = (int) expectedToContainEntriesTempArray[ind];
+                expectedToContainEntriesBitSet.set(entryId - (int) startEntryId);
+            }
+
+            List<Long> actualUnavailableEntries = availabilityOfEntriesOfLedger.getUnavailableEntries(startEntryId,
+                    lastEntryId, expectedToContainEntriesBitSet);
+            assertEquals("Unavailable Entries", unavailableEntriesTempList, actualUnavailableEntries);
+        }
+    }
+
+    @Test
+    public void testEmptyAvailabilityOfEntriesOfLedger() {
+        AvailabilityOfEntriesOfLedger emptyOne = AvailabilityOfEntriesOfLedger.EMPTY_AVAILABILITYOFENTRIESOFLEDGER;
+        assertEquals("expected totalNumOfAvailableEntries", 0, emptyOne.getTotalNumOfAvailableEntries());
+        assertFalse("empty one is not supposed to contain any entry", emptyOne.isEntryAvailable(100L));
+        long startEntryId = 100;
+        long lastEntryId = 105;
+        BitSet bitSetOfAvailability = new BitSet((int) (lastEntryId - startEntryId + 1));
+        for (int i = 0; i < bitSetOfAvailability.length(); i++) {
+            if ((i % 2) == 0) {
+                bitSetOfAvailability.set(i);
+            }
+        }
+        List<Long> unavailableEntries = emptyOne.getUnavailableEntries(startEntryId, lastEntryId, bitSetOfAvailability);
+        assertEquals("Num of unavailable entries", bitSetOfAvailability.cardinality(), unavailableEntries.size());
+        for (int i = 0; i < bitSetOfAvailability.length(); i++) {
+            long entryId = startEntryId + i;
+            if (bitSetOfAvailability.get(i)) {
+                assertTrue("Unavailable entry", unavailableEntries.contains(Long.valueOf(entryId)));
+            }
+        }
+    }
 }
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 081428e..d83308e 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -684,6 +684,12 @@ groups:
   - param: storeSystemTimeAsLedgerUnderreplicatedMarkTime
     description: Enable the Auditor to use system time as underreplicated ledger mark time. If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.
     default: true
+  - param: underreplicatedLedgerRecoveryGracePeriod
+    description: The grace period (in seconds) for underreplicated ledgers recovery. If ledger is marked underreplicated for more than this period then it will be reported by placementPolicyCheck in Auditor. Setting this to 0 will disable this check.
+    default: 0
+  - param: auditorReplicasCheckInterval
+    description: Sets the regularity/interval at which the auditor will run a replicas check of all ledgers, which are closed. This should not be run very often since it validates availability of replicas of all ledgers by querying bookies. Setting this to 0 will completely disable the periodic replicas check. By default it is disabled.
+    default: 0
 
 - name: AutoRecovery replication worker settings
   params: