You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by re...@apache.org on 2019/02/01 19:43:09 UTC

[bookkeeper] branch master updated: PlacementPolicy checker validating EnsemblePlacementpolicy

This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a79cd08  PlacementPolicy checker validating EnsemblePlacementpolicy
a79cd08 is described below

commit a79cd081114ed45d2e0121d524ea1b710d7d2452
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Fri Feb 1 11:43:04 2019 -0800

    PlacementPolicy checker validating EnsemblePlacementpolicy
    
    
    Descriptions of the changes in this PR:
    
    - As described in BP-34,
    https://github.com/apache/bookkeeper/blob/master/site/bps/BP-34-cluster-metadata-checker.md,
    this change introduces new checker - cluster-metadata-checker with initial responsibility
    of validating the EnsemblePlacementpolicy of segments of ledgers (which are writeclosed/fenced).
    
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1902 from reddycharan/placementpolicyscrutiny
---
 bookkeeper-proto/src/main/proto/DataFormats.proto  |   8 +
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  20 +
 .../bookkeeper/conf/ClientConfiguration.java       |   2 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  25 ++
 .../meta/LedgerUnderreplicationManager.java        |  16 +
 .../meta/ZkLedgerUnderreplicationManager.java      |  50 +++
 .../org/apache/bookkeeper/replication/Auditor.java | 303 +++++++++++----
 .../bookkeeper/replication/ReplicationStats.java   |   3 +
 .../bookkeeper/util/BookKeeperConstants.java       |   1 +
 .../replication/AuditorPeriodicCheckTest.java      | 108 +++++-
 .../AuditorPlacementPolicyCheckTest.java           | 418 +++++++++++++++++++++
 .../TestLedgerUnderreplicationManager.java         |  15 +
 site/_data/config/bk_server.yaml                   |   3 +
 13 files changed, 898 insertions(+), 74 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto b/bookkeeper-proto/src/main/proto/DataFormats.proto
index 79d9b2f..a679248 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -102,3 +102,11 @@ message AuditorVoteFormat {
 message CheckAllLedgersFormat {
     optional int64 checkAllLedgersCTime = 1;
 }
+
+/**
+ * information of PlacementPolicyCheck execution
+ */
+message PlacementPolicyCheckFormat {
+    optional int64 placementPolicyCheckCTime = 1;
+}
+
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 1b4f476..37b59d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1642,4 +1642,24 @@ public class BookKeeperAdmin implements AutoCloseable {
         }
         return firstStoredEntryId != LedgerHandle.INVALID_ENTRY_ID;
     }
+
+    /**
+     * returns boolean value specifying if the ensemble of the segment is
+     * adhering to the ensemble placement policy for the given writeQuorumSize
+     * and ackQuorumSize.
+     *
+     * @param ensembleBookiesList
+     *            ensemble of the segment
+     * @param writeQuorumSize
+     *            writeQuorumSize of the ledger
+     * @param ackQuorumSize
+     *            ackQuorumSize of the ledger
+     * @return <tt>true</tt> if the ledger is adhering to
+     *         EnsemblePlacementPolicy
+     */
+    public boolean isEnsembleAdheringToPlacementPolicy(List<BookieSocketAddress> ensembleBookiesList,
+            int writeQuorumSize, int ackQuorumSize) {
+        return bkc.getPlacementPolicy().isEnsembleAdheringToPlacementPolicy(ensembleBookiesList, writeQuorumSize,
+                ackQuorumSize);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 29c6820..f25a065 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -152,7 +152,7 @@ public class ClientConfiguration extends AbstractConfiguration<ClientConfigurati
     protected static final String NUM_IO_THREADS = "numIOThreads";
 
     // Ensemble Placement Policy
-    protected static final String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
+    public static final String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
     protected static final String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
     protected static final String READ_REORDER_THRESHOLD_PENDING_REQUESTS = "readReorderThresholdPendingRequests";
     protected static final String ENSEMBLE_PLACEMENT_POLICY_ORDER_SLOW_BOOKIES =
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 64f6c4b..f4972f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -181,6 +181,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     // Replication parameters
     protected static final String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
     protected static final String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
+    protected static final String AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL =
+                                                                "auditorPeriodicPlacementPolicyCheckInterval";
     protected static final String AUDITOR_LEDGER_VERIFICATION_PERCENTAGE = "auditorLedgerVerificationPercentage";
     protected static final String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
     protected static final String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
@@ -2189,6 +2191,29 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
     }
 
     /**
+     * Sets the regularity/interval at which the auditor will run a placement
+     * policy check of all ledgers, which are closed. This should not be run
+     * very often, and should be run at most once a day. Setting this to 0 will
+     * completely disable the periodic metadata check.
+     *
+     * @param interval
+     *            The interval in seconds. e.g. 86400 = 1 day, 604800 = 1 week
+     */
+    public void setAuditorPeriodicPlacementPolicyCheckInterval(long interval) {
+        setProperty(AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL, interval);
+    }
+
+    /**
+     * Get the regularity at which the auditor does placement policy check of
+     * all ledgers, which are closed.
+     *
+     * @return The interval in seconds. By default it is disabled.
+     */
+    public long getAuditorPeriodicPlacementPolicyCheckInterval() {
+        return getLong(AUDITOR_PERIODIC_PLACEMENT_POLICY_CHECK_INTERVAL, 0);
+    }
+
+    /**
      * Set what percentage of a ledger (fragment)'s entries will be verified.
      * 0 - only the first and last entry of each ledger fragment would be verified
      * 100 - the entire ledger fragment would be verified
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 304f184..4dadffe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -185,6 +185,22 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
     long getCheckAllLedgersCTime() throws ReplicationException.UnavailableException;
 
     /**
+     * Setter for the PlacementPolicyCheck last executed ctime.
+     *
+     * @param placementPolicyCheckCTime
+     * @throws ReplicationException.UnavailableException
+     */
+    void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws ReplicationException.UnavailableException;
+
+    /**
+     * Getter for the PlacementPolicyCheck last executed ctime.
+     *
+     * @return the long value of placementPolicyCheckCTime
+     * @throws ReplicationException.UnavailableException
+     */
+    long getPlacementPolicyCheckCTime() throws ReplicationException.UnavailableException;
+
+    /**
      * Receive notification asynchronously when the lostBookieRecoveryDelay value is Changed.
      *
      * @param cb
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index c4e9b30..22bed6f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
 import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
 import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
+import org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
 import org.apache.bookkeeper.replication.ReplicationEnableCb;
 import org.apache.bookkeeper.replication.ReplicationException;
@@ -118,6 +119,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final AbstractConfiguration conf;
     private final String lostBookieRecoveryDelayZnode;
     private final String checkAllLedgersCtimeZnode;
+    private final String placementPolicyCheckCtimeZnode;
     private final ZooKeeper zkc;
     private final SubTreeCache subTreeCache;
 
@@ -131,6 +133,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
         lostBookieRecoveryDelayZnode = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
         checkAllLedgersCtimeZnode = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
+        placementPolicyCheckCtimeZnode = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
         idExtractionPattern = Pattern.compile("urL(\\d+)$");
         this.zkc = zkc;
         this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
@@ -933,4 +936,51 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
         }
     }
+
+    @Override
+    public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws UnavailableException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("setPlacementPolicyCheckCTime");
+        }
+        try {
+            List<ACL> zkAcls = ZkUtils.getACLs(conf);
+            PlacementPolicyCheckFormat.Builder builder = PlacementPolicyCheckFormat.newBuilder();
+            builder.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
+            byte[] placementPolicyCheckFormatByteArray = builder.build().toByteArray();
+            if (zkc.exists(placementPolicyCheckCtimeZnode, false) != null) {
+                zkc.setData(placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, -1);
+            } else {
+                zkc.create(placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, zkAcls,
+                        CreateMode.PERSISTENT);
+            }
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        }
+    }
+
+    @Override
+    public long getPlacementPolicyCheckCTime() throws UnavailableException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getPlacementPolicyCheckCTime");
+        }
+        try {
+            byte[] data = zkc.getData(placementPolicyCheckCtimeZnode, false, null);
+            PlacementPolicyCheckFormat placementPolicyCheckFormat = PlacementPolicyCheckFormat.parseFrom(data);
+            return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime()
+                    ? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1;
+        } catch (KeeperException.NoNodeException ne) {
+            LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
+            return -1;
+        } catch (KeeperException ke) {
+            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
+        } catch (InvalidProtocolBufferException ipbe) {
+            throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 870cea3..dd791be 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.replication;
 
+
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDIT_BOOKIES_TIME;
 import static org.apache.bookkeeper.replication.ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME;
@@ -30,6 +31,9 @@ import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DELAYED_BOO
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FRAGMENTS_PER_LEDGER;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_LEDGERS_CHECKED;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS;
+import static org.apache.bookkeeper.replication.ReplicationStats.
+                                    PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER;
+import static org.apache.bookkeeper.replication.ReplicationStats.PLACEMENT_POLICY_CHECK_TIME;
 import static org.apache.bookkeeper.replication.ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -37,6 +41,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,12 +49,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -57,6 +64,7 @@ import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -132,6 +140,11 @@ public class Auditor implements AutoCloseable {
     )
     private final OpStatsLogger checkAllLedgersTime;
     @StatsDoc(
+            name = PLACEMENT_POLICY_CHECK_TIME,
+            help = "the latency distribution of placementPolicy check"
+        )
+    private final OpStatsLogger placementPolicyCheckTime;
+    @StatsDoc(
         name = AUDIT_BOOKIES_TIME,
         help = "the latency distribution of auditing all the bookies"
     )
@@ -161,6 +174,12 @@ public class Auditor implements AutoCloseable {
         help = "the number of delayed-bookie-audits cancelled"
     )
     private final Counter numDelayedBookieAuditsCancelled;
+    @StatsDoc(
+        name = PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER,
+        help = "total number of "
+            + "ledgers failed to adhere to EnsemblePlacementPolicy found in PLACEMENT POLICY check"
+    )
+    private final Counter placementPolicyCheckEnsembleNotAdheringToPlacementPolicy;
 
     static BookKeeper createBookKeeperClient(ServerConfiguration conf) throws InterruptedException, IOException {
         return createBookKeeperClient(conf, NullStatsLogger.INSTANCE);
@@ -218,6 +237,7 @@ public class Auditor implements AutoCloseable {
         bookieToLedgersMapCreationTime = this.statsLogger
                 .getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
         checkAllLedgersTime = this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
+        placementPolicyCheckTime = this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
         auditBookiesTime = this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
         numLedgersChecked = this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
         numFragmentsPerLedger = statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
@@ -225,7 +245,8 @@ public class Auditor implements AutoCloseable {
         numBookieAuditsDelayed = this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
         numDelayedBookieAuditsCancelled = this.statsLogger
                 .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
-
+        placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = statsLogger
+                .getCounter(ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
         this.bkc = bkc;
         this.ownBkc = ownBkc;
         initialize(conf, bkc);
@@ -466,80 +487,142 @@ public class Auditor implements AutoCloseable {
                 submitShutdownTask();
             }
 
-            long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
-            if (bookieCheckInterval == 0) {
-                LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
-                executor.submit(bookieCheck);
-            } else {
-                LOG.info("Auditor periodic bookie checking enabled"
-                         + " 'auditorPeriodicBookieCheckInterval' {} seconds", bookieCheckInterval);
-                executor.scheduleAtFixedRate(bookieCheck, 0, bookieCheckInterval, TimeUnit.SECONDS);
-            }
+            scheduleBookieCheckTask();
+            scheduleCheckAllLedgersTask();
+            schedulePlacementPolicyCheckTask();
+        }
+    }
 
-            long interval = conf.getAuditorPeriodicCheckInterval();
+    private void scheduleBookieCheckTask() {
+        long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
+        if (bookieCheckInterval == 0) {
+            LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
+            executor.submit(bookieCheck);
+        } else {
+            LOG.info("Auditor periodic bookie checking enabled" + " 'auditorPeriodicBookieCheckInterval' {} seconds",
+                    bookieCheckInterval);
+            executor.scheduleAtFixedRate(bookieCheck, 0, bookieCheckInterval, TimeUnit.SECONDS);
+        }
+    }
 
-            if (interval > 0) {
-                LOG.info("Auditor periodic ledger checking enabled" + " 'auditorPeriodicCheckInterval' {} seconds",
-                        interval);
+    private void scheduleCheckAllLedgersTask(){
+        long interval = conf.getAuditorPeriodicCheckInterval();
 
-                long checkAllLedgersLastExecutedCTime;
-                long durationSinceLastExecutionInSecs;
-                long initialDelay;
-                try {
-                    checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
-                } catch (UnavailableException ue) {
-                    LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
-                    checkAllLedgersLastExecutedCTime = -1;
-                }
-                if (checkAllLedgersLastExecutedCTime == -1) {
-                    durationSinceLastExecutionInSecs = -1;
-                    initialDelay = 0;
-                } else {
-                    durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime)
-                            / 1000;
-                    if (durationSinceLastExecutionInSecs < 0) {
-                        // this can happen if there is no strict time ordering
-                        durationSinceLastExecutionInSecs = 0;
-                    }
-                    initialDelay = durationSinceLastExecutionInSecs > interval ? 0
-                            : (interval - durationSinceLastExecutionInSecs);
+        if (interval > 0) {
+            LOG.info("Auditor periodic ledger checking enabled" + " 'auditorPeriodicCheckInterval' {} seconds",
+                    interval);
+
+            long checkAllLedgersLastExecutedCTime;
+            long durationSinceLastExecutionInSecs;
+            long initialDelay;
+            try {
+                checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
+            } catch (UnavailableException ue) {
+                LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
+                checkAllLedgersLastExecutedCTime = -1;
+            }
+            if (checkAllLedgersLastExecutedCTime == -1) {
+                durationSinceLastExecutionInSecs = -1;
+                initialDelay = 0;
+            } else {
+                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - checkAllLedgersLastExecutedCTime)
+                        / 1000;
+                if (durationSinceLastExecutionInSecs < 0) {
+                    // this can happen if there is no strict time ordering
+                    durationSinceLastExecutionInSecs = 0;
                 }
-                LOG.info(
-                        "checkAllLedgers scheduling info.  checkAllLedgersLastExecutedCTime: {} "
-                                + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
-                        checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
-
-                executor.scheduleAtFixedRate(new Runnable() {
-                    public void run() {
-                        try {
-                            if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
-                                LOG.info("Ledger replication disabled, skipping checkAllLedgers");
-                                return;
-                            }
+                initialDelay = durationSinceLastExecutionInSecs > interval ? 0
+                        : (interval - durationSinceLastExecutionInSecs);
+            }
+            LOG.info(
+                    "checkAllLedgers scheduling info.  checkAllLedgersLastExecutedCTime: {} "
+                            + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
+                    checkAllLedgersLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
 
-                            Stopwatch stopwatch = Stopwatch.createStarted();
-                            LOG.info("Starting checkAllLedgers");
-                            checkAllLedgers();
-                            long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
-                            LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
-                            checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
-                        } catch (KeeperException ke) {
-                            LOG.error("Exception while running periodic check", ke);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            LOG.error("Interrupted while running periodic check", ie);
-                        } catch (BKException bke) {
-                            LOG.error("Exception running periodic check", bke);
-                        } catch (IOException ioe) {
-                            LOG.error("I/O exception running periodic check", ioe);
-                        } catch (ReplicationException.UnavailableException ue) {
-                            LOG.error("Underreplication manager unavailable running periodic check", ue);
+            executor.scheduleAtFixedRate(new Runnable() {
+                public void run() {
+                    try {
+                        if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+                            LOG.info("Ledger replication disabled, skipping checkAllLedgers");
+                            return;
                         }
+
+                        Stopwatch stopwatch = Stopwatch.createStarted();
+                        LOG.info("Starting checkAllLedgers");
+                        checkAllLedgers();
+                        long checkAllLedgersDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                        LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
+                        checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
+                    } catch (KeeperException ke) {
+                        LOG.error("Exception while running periodic check", ke);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        LOG.error("Interrupted while running periodic check", ie);
+                    } catch (BKException bke) {
+                        LOG.error("Exception running periodic check", bke);
+                    } catch (IOException ioe) {
+                        LOG.error("I/O exception running periodic check", ioe);
+                    } catch (ReplicationException.UnavailableException ue) {
+                        LOG.error("Underreplication manager unavailable running periodic check", ue);
                     }
-                    }, initialDelay, interval, TimeUnit.SECONDS);
+                }
+                }, initialDelay, interval, TimeUnit.SECONDS);
+        } else {
+            LOG.info("Periodic checking disabled");
+        }
+    }
+
+    private void schedulePlacementPolicyCheckTask(){
+        long interval = conf.getAuditorPeriodicPlacementPolicyCheckInterval();
+
+        if (interval > 0) {
+            LOG.info("Auditor periodic placement policy check enabled"
+                    + " 'auditorPeriodicPlacementPolicyCheckInterval' {} seconds", interval);
+
+            long placementPolicyCheckLastExecutedCTime;
+            long durationSinceLastExecutionInSecs;
+            long initialDelay;
+            try {
+                placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
+            } catch (UnavailableException ue) {
+                LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue);
+                placementPolicyCheckLastExecutedCTime = -1;
+            }
+            if (placementPolicyCheckLastExecutedCTime == -1) {
+                durationSinceLastExecutionInSecs = -1;
+                initialDelay = 0;
             } else {
-                LOG.info("Periodic checking disabled");
+                durationSinceLastExecutionInSecs = (System.currentTimeMillis() - placementPolicyCheckLastExecutedCTime)
+                        / 1000;
+                if (durationSinceLastExecutionInSecs < 0) {
+                    // this can happen if there is no strict time ordering
+                    durationSinceLastExecutionInSecs = 0;
+                }
+                initialDelay = durationSinceLastExecutionInSecs > interval ? 0
+                        : (interval - durationSinceLastExecutionInSecs);
             }
+            LOG.info(
+                    "placementPolicyCheck scheduling info.  placementPolicyCheckLastExecutedCTime: {} "
+                            + "durationSinceLastExecutionInSecs: {} initialDelay: {} interval: {}",
+                    placementPolicyCheckLastExecutedCTime, durationSinceLastExecutionInSecs, initialDelay, interval);
+
+            executor.scheduleAtFixedRate(new Runnable() {
+                public void run() {
+                    try {
+                        Stopwatch stopwatch = Stopwatch.createStarted();
+                        LOG.info("Starting PlacementPolicyCheck");
+                        placementPolicyCheck();
+                        long placementPolicyCheckDuration = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                        LOG.info("Completed placementPolicyCheck in {} milliSeconds", placementPolicyCheckDuration);
+                        placementPolicyCheckTime.registerSuccessfulEvent(placementPolicyCheckDuration,
+                                TimeUnit.MILLISECONDS);
+                    } catch (BKAuditException e) {
+                        LOG.error("BKAuditException running periodic placementPolicy check", e);
+                    }
+                }
+            }, initialDelay, interval, TimeUnit.SECONDS);
+        } else {
+            LOG.info("Periodic placementPolicy check disabled");
         }
     }
 
@@ -743,11 +826,11 @@ public class Auditor implements AutoCloseable {
      * be run very often.
      */
     void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
-        final BookKeeper client = createBookKeeperClient(conf);
-        final BookKeeperAdmin admin = new BookKeeperAdmin(client, statsLogger);
+        final BookKeeper localClient = createBookKeeperClient(conf);
+        final BookKeeperAdmin localAdmin = new BookKeeperAdmin(localClient, statsLogger);
 
         try {
-            final LedgerChecker checker = new LedgerChecker(client);
+            final LedgerChecker checker = new LedgerChecker(localClient);
 
             final CompletableFuture<Void> processFuture = new CompletableFuture<>();
 
@@ -764,7 +847,7 @@ public class Auditor implements AutoCloseable {
                     return;
                 }
 
-                admin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
+                localAdmin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
                     if (Code.OK == rc) {
                         checker.checkLedger(lh,
                                 // the ledger handle will be closed after checkLedger is done.
@@ -803,8 +886,84 @@ public class Auditor implements AutoCloseable {
                 LOG.error("Got exception while trying to set checkAllLedgersCTime", ue);
             }
         } finally {
-            admin.close();
-            client.close();
+            localAdmin.close();
+            localClient.close();
+        }
+    }
+
+    void placementPolicyCheck() throws BKAuditException {
+        final CountDownLatch placementPolicyCheckLatch = new CountDownLatch(1);
+        Processor<Long> ledgerProcessor = new Processor<Long>() {
+            @Override
+            public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
+                ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadataVer, exception) -> {
+                    if (exception == null) {
+                        LedgerMetadata metadata = metadataVer.getValue();
+                        int writeQuorumSize = metadata.getWriteQuorumSize();
+                        int ackQuorumSize = metadata.getAckQuorumSize();
+                        if (metadata.isClosed()) {
+                            boolean foundSegmentNotAdheringToPlacementPolicy = false;
+                            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> ensemble : metadata
+                                    .getAllEnsembles().entrySet()) {
+                                long startEntryIdOfSegment = ensemble.getKey();
+                                List<BookieSocketAddress> ensembleOfSegment = ensemble.getValue();
+                                boolean segmentAdheringToPlacementPolicy = admin.isEnsembleAdheringToPlacementPolicy(
+                                        ensembleOfSegment, writeQuorumSize, ackQuorumSize);
+                                if (!segmentAdheringToPlacementPolicy) {
+                                    foundSegmentNotAdheringToPlacementPolicy = true;
+                                    LOG.warn(
+                                            "For ledger: {}, Segment starting at entry: {}, with ensemble: {} having "
+                                                    + "writeQuorumSize: {} and ackQuorumSize: {} is not adhering to "
+                                                    + "EnsemblePlacementPolicy",
+                                            ledgerId, startEntryIdOfSegment, ensembleOfSegment, writeQuorumSize,
+                                            ackQuorumSize);
+                                }
+                            }
+                            if (foundSegmentNotAdheringToPlacementPolicy) {
+                                placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.inc();
+                            }
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Ledger: {} is not yet closed, so skipping the placementPolicy"
+                                        + "check analysis for now", ledgerId);
+                            }
+                        }
+                        iterCallback.processResult(BKException.Code.OK, null, null);
+                    } else if (BKException
+                            .getExceptionCode(exception) == BKException.Code.NoSuchLedgerExistsException) {
+                        LOG.debug("Ignoring replication of already deleted ledger {}", ledgerId);
+                        iterCallback.processResult(BKException.Code.OK, null, null);
+                    } else {
+                        LOG.warn("Unable to read the ledger: {} information", ledgerId);
+                        iterCallback.processResult(BKException.getExceptionCode(exception), null, null);
+                    }
+                });
+            }
+        };
+        // Reading the result after processing all the ledgers
+        final List<Integer> resultCode = new ArrayList<Integer>(1);
+        ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
+
+            @Override
+            public void processResult(int rc, String s, Object obj) {
+                resultCode.add(rc);
+                placementPolicyCheckLatch.countDown();
+            }
+        }, null, BKException.Code.OK, BKException.Code.ReadException);
+        try {
+            placementPolicyCheckLatch.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new BKAuditException("Exception while doing placementPolicy check", e);
+        }
+        if (!resultCode.contains(BKException.Code.OK)) {
+            throw new BKAuditException("Exception while doing placementPolicy check",
+                    BKException.create(resultCode.get(0)));
+        }
+        try {
+            ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
+        } catch (UnavailableException ue) {
+            LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index eac30ec..e9b8999 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -33,12 +33,15 @@ public interface ReplicationStats {
     String URL_PUBLISH_TIME_FOR_LOST_BOOKIE = "URL_PUBLISH_TIME_FOR_LOST_BOOKIE";
     String BOOKIE_TO_LEDGERS_MAP_CREATION_TIME = "BOOKIE_TO_LEDGERS_MAP_CREATION_TIME";
     String CHECK_ALL_LEDGERS_TIME = "CHECK_ALL_LEDGERS_TIME";
+    String PLACEMENT_POLICY_CHECK_TIME = "PLACEMENT_POLICY_CHECK_TIME";
     String AUDIT_BOOKIES_TIME = "AUDIT_BOOKIES_TIME";
     String NUM_FRAGMENTS_PER_LEDGER = "NUM_FRAGMENTS_PER_LEDGER";
     String NUM_BOOKIES_PER_LEDGER = "NUM_BOOKIES_PER_LEDGER";
     String NUM_LEDGERS_CHECKED = "NUM_LEDGERS_CHECKED";
     String NUM_BOOKIE_AUDITS_DELAYED = "NUM_BOOKIE_AUDITS_DELAYED";
     String NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED = "NUM_DELAYED_BOOKIE_AUDITS_CANCELLED";
+    String PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER =
+            "PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER";
 
     String REPLICATION_WORKER_SCOPE = "replication_worker";
     String REREPLICATE_OP = "rereplicate";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 8cfe942..89b0d0c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -43,6 +43,7 @@ public class BookKeeperConstants {
     public static final String DISABLE_NODE = "disable";
     public static final String LOSTBOOKIERECOVERYDELAY_NODE = "lostBookieRecoveryDelay";
     public static final String CHECK_ALL_LEDGERS_CTIME = "checkallledgersctime";
+    public static final String PLACEMENT_POLICY_CHECK_CTIME = "placementpolicycheckctime";
     public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
     public static final String LAYOUT_ZNODE = "LAYOUT";
     public static final String INSTANCEID = "INSTANCEID";
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index fd97da3..dc343eb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
+import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -397,6 +398,8 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         TestOpStatsLogger checkAllLedgersStatsLogger = (TestOpStatsLogger) statsLogger
                 .getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
         servConf.setAuditorPeriodicCheckInterval(auditorPeriodicCheckInterval);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(0);
         final TestAuditor auditor = new TestAuditor(Bookie.getBookieAddress(servConf).toString(), servConf, bkc, false,
                 statsLogger);
         CountDownLatch latch = auditor.getLatch();
@@ -454,7 +457,100 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         auditor.close();
     }
 
-    class TestAuditor extends Auditor {
+    @Test
+    public void testInitialDelayOfPlacementPolicyCheck() throws Exception {
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+
+        final int numLedgers = 10;
+        List<Long> ids = new LinkedList<Long>();
+        for (int i = 0; i < numLedgers; i++) {
+            LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+            ids.add(lh.getId());
+            for (int j = 0; j < 2; j++) {
+                lh.addEntry("testdata".getBytes());
+            }
+            lh.close();
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        validateInitialDelayOfPlacementPolicyCheck(urm, -1, 1000, servConf, bkc);
+        validateInitialDelayOfPlacementPolicyCheck(urm, 999, 1000, servConf, bkc);
+        validateInitialDelayOfPlacementPolicyCheck(urm, 1001, 1000, servConf, bkc);
+    }
+
+    void validateInitialDelayOfPlacementPolicyCheck(LedgerUnderreplicationManager urm, long timeSinceLastExecutedInSecs,
+            long auditorPeriodicPlacementPolicyCheckInterval, ServerConfiguration servConf, BookKeeper bkc)
+            throws UnavailableException, UnknownHostException, InterruptedException {
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        TestOpStatsLogger placementPolicyCheckStatsLogger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(auditorPeriodicPlacementPolicyCheckInterval);
+        servConf.setAuditorPeriodicCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(0);
+        final TestAuditor auditor = new TestAuditor(Bookie.getBookieAddress(servConf).toString(), servConf, bkc, false,
+                statsLogger);
+        CountDownLatch latch = auditor.getLatch();
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
+        long curTimeBeforeStart = System.currentTimeMillis();
+        long placementPolicyCheckCTime = -1;
+        long initialDelayInMsecs = -1;
+        long nextExpectedPlacementPolicyCheckExecutionTime = -1;
+        long bufferTimeInMsecs = 20000L;
+        if (timeSinceLastExecutedInSecs == -1) {
+            /*
+             * if we are setting placementPolicyCheckCTime to -1, it means that
+             * placementPolicyCheck hasn't run before. So initialDelay for
+             * placementPolicyCheck should be 0.
+             */
+            placementPolicyCheckCTime = -1;
+            initialDelayInMsecs = 0;
+        } else {
+            placementPolicyCheckCTime = curTimeBeforeStart - timeSinceLastExecutedInSecs * 1000L;
+            initialDelayInMsecs = timeSinceLastExecutedInSecs > auditorPeriodicPlacementPolicyCheckInterval ? 0
+                    : (auditorPeriodicPlacementPolicyCheckInterval - timeSinceLastExecutedInSecs) * 1000L;
+        }
+        /*
+         * next placementPolicyCheck should happen atleast after
+         * nextExpectedPlacementPolicyCheckExecutionTime.
+         */
+        nextExpectedPlacementPolicyCheckExecutionTime = curTimeBeforeStart + initialDelayInMsecs;
+
+        urm.setPlacementPolicyCheckCTime(placementPolicyCheckCTime);
+        auditor.start();
+        /*
+         * since auditorPeriodicPlacementPolicyCheckInterval are higher values (in the
+         * order of 100s of seconds), its ok bufferTimeInMsecs to be ` 20 secs.
+         */
+        assertTrue("placementPolicyCheck should have executed with initialDelay " + initialDelayInMsecs,
+                latch.await(initialDelayInMsecs + bufferTimeInMsecs, TimeUnit.MILLISECONDS));
+        for (int i = 0; i < 20; i++) {
+            Thread.sleep(100);
+            if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1) {
+                break;
+            }
+        }
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 1, placementPolicyCheckStatsLogger.getSuccessCount());
+        long currentPlacementPolicyCheckCTime = urm.getPlacementPolicyCheckCTime();
+        assertTrue(
+                "currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime
+                        + " should be greater than nextExpectedPlacementPolicyCheckExecutionTime: "
+                        + nextExpectedPlacementPolicyCheckExecutionTime,
+                currentPlacementPolicyCheckCTime > nextExpectedPlacementPolicyCheckExecutionTime);
+        assertTrue(
+                "currentPlacementPolicyCheckCTime: " + currentPlacementPolicyCheckCTime
+                        + " should be lesser than nextExpectedPlacementPolicyCheckExecutionTime+bufferTimeInMsecs: "
+                        + (nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs),
+                currentPlacementPolicyCheckCTime < (nextExpectedPlacementPolicyCheckExecutionTime + bufferTimeInMsecs));
+        auditor.close();
+    }
+
+    static class TestAuditor extends Auditor {
 
         final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
 
@@ -463,11 +559,21 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
             super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
         }
 
+        public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger)
+                throws UnavailableException {
+            super(bookieIdentifier, conf, statsLogger);
+        }
+
         void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
             super.checkAllLedgers();
             latchRef.get().countDown();
         }
 
+        void placementPolicyCheck() throws BKAuditException {
+            super.placementPolicyCheck();
+            latchRef.get().countDown();
+        }
+
         CountDownLatch getLatch() {
             return latchRef.get();
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
new file mode 100644
index 0000000..68b5df2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -0,0 +1,418 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
+import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.MetadataBookieDriver;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.replication.AuditorPeriodicCheckTest.TestAuditor;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestStatsProvider;
+import org.apache.bookkeeper.test.TestStatsProvider.TestCounter;
+import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
+import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.apache.bookkeeper.util.StaticDNSResolver;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests the logic of Auditor's PlacementPolicyCheck.
+ */
+public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
+    private MetadataBookieDriver driver;
+
+    public AuditorPlacementPolicyCheckTest() {
+        super(1);
+        baseConf.setPageLimit(1); // to make it easy to push ledger out of cache
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        StaticDNSResolver.reset();
+        driver = MetadataDrivers.getBookieDriver(URI.create(bsConfs.get(0).getMetadataServiceUri()));
+        driver.initialize(bsConfs.get(0), () -> {
+        }, NullStatsLogger.INSTANCE);
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        if (null != driver) {
+            driver.close();
+        }
+        super.tearDown();
+    }
+
+    @Test
+    public void testPlacementPolicyCheckWithBookiesFromDifferentRacks() throws Exception {
+        int numOfBookies = 5;
+        List<BookieSocketAddress> bookieAddresses = new ArrayList<BookieSocketAddress>();
+        BookieSocketAddress bookieAddress;
+        RegistrationManager regManager = driver.getRegistrationManager();
+
+        // all the numOfBookies (5) are going to be in different racks
+        for (int i = 0; i < numOfBookies; i++) {
+            bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
+            StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), "/rack" + (i));
+            bookieAddresses.add(bookieAddress);
+            regManager.registerBookie(bookieAddress.toString(), false);
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        int ensembleSize = 5;
+        int writeQuorumSize = 4;
+        int ackQuorumSize = 2;
+        int minNumRacksPerWriteQuorumConfValue = 4;
+        Collections.shuffle(bookieAddresses);
+
+        // closed ledger
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(1L, initMeta).get();
+
+        Collections.shuffle(bookieAddresses);
+        ensembleSize = 4;
+        // closed ledger with multiple segments
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
+                .newEnsembleEntry(20L, bookieAddresses.subList(1, 5))
+                .newEnsembleEntry(60L, bookieAddresses.subList(0, 4))
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(2L, initMeta).get();
+
+        Collections.shuffle(bookieAddresses);
+        // non-closed ledger
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(3L, initMeta).get();
+
+        Collections.shuffle(bookieAddresses);
+        // non-closed ledger with multiple segments
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
+                .newEnsembleEntry(20L, bookieAddresses.subList(1, 5))
+                .newEnsembleEntry(60L, bookieAddresses.subList(0, 4))
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(4L, initMeta).get();
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        setServerConfigProperties(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            TestCounter placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) statsLogger.getCounter(
+                    ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+            /*
+             * since all of the bookies are in different racks, there shouldn't be any ledger not adhering
+             * to placement policy.
+             */
+            assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER SuccessCount", 0L,
+                    placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+    }
+
+    @Test
+    public void testPlacementPolicyCheckWithLedgersNotAdheringToPlacementPolicy() throws Exception {
+        int numOfBookies = 5;
+        int numOfLedgersNotAdheringToPlacementPolicy = 0;
+        List<BookieSocketAddress> bookieAddresses = new ArrayList<BookieSocketAddress>();
+        RegistrationManager regManager = driver.getRegistrationManager();
+
+        for (int i = 0; i < numOfBookies; i++) {
+            BookieSocketAddress bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
+            bookieAddresses.add(bookieAddress);
+            regManager.registerBookie(bookieAddress.toString(), false);
+        }
+
+        // only three racks
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(0).getHostName(), "/rack1");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(1).getHostName(), "/rack2");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(2).getHostName(), "/rack3");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(3).getHostName(), "/rack1");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(4).getHostName(), "/rack2");
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        int ensembleSize = 5;
+        int writeQuorumSize = 3;
+        int ackQuorumSize = 2;
+        int minNumRacksPerWriteQuorumConfValue = 3;
+
+        /*
+         * this closed ledger doesn't adhere to placement policy because there are only
+         * 3 racks, and the ensembleSize is 5.
+         */
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(1L, initMeta).get();
+        numOfLedgersNotAdheringToPlacementPolicy++;
+
+        /*
+         * this is non-closed ledger, so it shouldn't count as ledger not
+         * adhering to placement policy
+         */
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(2L, initMeta).get();
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        setServerConfigProperties(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            TestCounter placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) statsLogger.getCounter(
+                    ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+            assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER SuccessCount",
+                    (long) numOfLedgersNotAdheringToPlacementPolicy,
+                    placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+    }
+
+    @Test
+    public void testPlacementPolicyCheckWithLedgersNotAdheringToPolicyWithMultipleSegments() throws Exception {
+        int numOfBookies = 7;
+        int numOfLedgersNotAdheringToPlacementPolicy = 0;
+        List<BookieSocketAddress> bookieAddresses = new ArrayList<BookieSocketAddress>();
+        RegistrationManager regManager = driver.getRegistrationManager();
+
+        for (int i = 0; i < numOfBookies; i++) {
+            BookieSocketAddress bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
+            bookieAddresses.add(bookieAddress);
+            regManager.registerBookie(bookieAddress.toString(), false);
+        }
+
+        // only three racks
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(0).getHostName(), "/rack1");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(1).getHostName(), "/rack2");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(2).getHostName(), "/rack3");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(3).getHostName(), "/rack4");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(4).getHostName(), "/rack1");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(5).getHostName(), "/rack2");
+        StaticDNSResolver.addNodeToRack(bookieAddresses.get(6).getHostName(), "/rack3");
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerManager lm = mFactory.newLedgerManager();
+        int ensembleSize = 5;
+        int writeQuorumSize = 5;
+        int ackQuorumSize = 2;
+        int minNumRacksPerWriteQuorumConfValue = 4;
+
+        /*
+         * this closed ledger in each writeQuorumSize (5), there would be
+         * atleast minNumRacksPerWriteQuorumConfValue (4) racks. So it wont be
+         * counted as ledgers not adhering to placement policy.
+         */
+        LedgerMetadata initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses.subList(0, 5))
+                .newEnsembleEntry(20L, bookieAddresses.subList(1, 6))
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(1L, initMeta).get();
+
+        /*
+         * for the second segment bookies are from /rack1, /rack2 and /rack3,
+         * which is < minNumRacksPerWriteQuorumConfValue (4). So it is not
+         * adhering to placement policy.
+         *
+         * also for the third segment are from /rack1, /rack2 and /rack3, which
+         * is < minNumRacksPerWriteQuorumConfValue (4). So it is not adhering to
+         * placement policy.
+         *
+         * Though there are multiple segments are not adhering to placement
+         * policy, it should be counted as single ledger.
+         */
+        initMeta = LedgerMetadataBuilder.create()
+                .withEnsembleSize(ensembleSize)
+                .withWriteQuorumSize(writeQuorumSize)
+                .withAckQuorumSize(ackQuorumSize)
+                .newEnsembleEntry(0L, bookieAddresses.subList(0, 5))
+                .newEnsembleEntry(20L,
+                        Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2),
+                                bookieAddresses.get(4), bookieAddresses.get(5)))
+                .newEnsembleEntry(40L,
+                        Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2),
+                                bookieAddresses.get(4), bookieAddresses.get(6)))
+                .withClosedState()
+                .withLastEntryId(100)
+                .withLength(10000)
+                .withDigestType(DigestType.DUMMY)
+                .withPassword(new byte[0])
+                .build();
+        lm.createLedgerMetadata(2L, initMeta).get();
+        numOfLedgersNotAdheringToPlacementPolicy++;
+
+        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
+        setServerConfigProperties(servConf);
+        MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
+        try {
+            TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
+            TestCounter placementPolicyCheckEnsembleNotAdheringToPlacementPolicy = (TestCounter) statsLogger.getCounter(
+                    ReplicationStats.PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER);
+            assertEquals("PLACEMENT_POLICY_CHECK_ENSEMBLE_NOT_ADHERING_TO_PLACEMENT_POLICY_COUNTER SuccessCount",
+                    (long) numOfLedgersNotAdheringToPlacementPolicy,
+                    placementPolicyCheckEnsembleNotAdheringToPlacementPolicy.get().longValue());
+        } finally {
+            Auditor auditor = auditorRef.getValue();
+            if (auditor != null) {
+                auditor.close();
+            }
+        }
+    }
+
+    private void setServerConfigProperties(ServerConfiguration servConf) {
+        servConf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
+        servConf.setProperty(ClientConfiguration.ENSEMBLE_PLACEMENT_POLICY,
+                RackawareEnsemblePlacementPolicy.class.getName());
+        servConf.setAuditorPeriodicCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(0);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1000);
+    }
+
+    private TestStatsLogger startAuditorAndWaitForPlacementPolicyCheck(ServerConfiguration servConf,
+            MutableObject<Auditor> auditorRef) throws MetadataException, CompatibilityException, KeeperException,
+            InterruptedException, UnavailableException, UnknownHostException {
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        TestOpStatsLogger placementPolicyCheckStatsLogger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+
+        final TestAuditor auditor = new TestAuditor(Bookie.getBookieAddress(servConf).toString(), servConf,
+                statsLogger);
+        auditorRef.setValue(auditor);
+        CountDownLatch latch = auditor.getLatch();
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
+        urm.setPlacementPolicyCheckCTime(-1);
+        auditor.start();
+        /*
+         * since placementPolicyCheckCTime is set to -1, placementPolicyCheck should be
+         * scheduled to run with no initialdelay
+         */
+        assertTrue("placementPolicyCheck should have executed", latch.await(20, TimeUnit.SECONDS));
+        for (int i = 0; i < 20; i++) {
+            Thread.sleep(100);
+            if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1) {
+                break;
+            }
+        }
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 1, placementPolicyCheckStatsLogger.getSuccessCount());
+        return statsLogger;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index e84113b..2245d3f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -780,6 +780,21 @@ public class TestLedgerUnderreplicationManager {
         assertEquals(curTime, underReplicaMgr1.getCheckAllLedgersCTime());
     }
 
+    @Test
+    public void testPlacementPolicyCheckCTime() throws Exception {
+        @Cleanup
+        LedgerUnderreplicationManager underReplicaMgr1 = lmf1.newLedgerUnderreplicationManager();
+        @Cleanup
+        LedgerUnderreplicationManager underReplicaMgr2 = lmf2.newLedgerUnderreplicationManager();
+        assertEquals(-1, underReplicaMgr1.getPlacementPolicyCheckCTime());
+        long curTime = System.currentTimeMillis();
+        underReplicaMgr2.setPlacementPolicyCheckCTime(curTime);
+        assertEquals(curTime, underReplicaMgr1.getPlacementPolicyCheckCTime());
+        curTime = System.currentTimeMillis();
+        underReplicaMgr2.setPlacementPolicyCheckCTime(curTime);
+        assertEquals(curTime, underReplicaMgr1.getPlacementPolicyCheckCTime());
+    }
+
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
             throws KeeperException, InterruptedException, ReplicationException {
         Long ledgerA = 0xfeadeefdacL;
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index f74da27..328beca 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -656,6 +656,9 @@ groups:
 
       Set this to 0 to disable the periodic check completely. Note that periodic checking will put extra load on the cluster, so it should not be run more frequently than once a day.
     default: 604800
+  - param: auditorPeriodicPlacementPolicyCheckInterval
+    description: The time interval between auditor placement policy checks, in seconds. The auditor placement policy check validates if the ensemble of segments of all the closed ledgers is adhering to the placement policy. It is just monitoring scrutiny but doesn't take any corrective measure other than logging error and reporting metrics. By default it is disabled.
+    default: 0
   - param: auditorLedgerVerificationPercentage
     description: |
       The percentage of a ledger (fragment)'s entries will be verified before claiming a fragment as missing. If it is 0, it only verifies the first and last entries of a given fragment.