You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2023/03/16 04:21:34 UTC

[bookkeeper] branch master updated: [improvement] Delay all audit task when have a already delayed bookie check task (#3818)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new ed542b9636 [improvement] Delay all audit task when have a already delayed bookie check task (#3818)
ed542b9636 is described below

commit ed542b963621907361ee518ea5fc3c27cb3aa41c
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Mar 16 12:21:28 2023 +0800

    [improvement] Delay all audit task when have a already delayed bookie check task (#3818)
    
    ### Motivation
    
    Fixes #3817
    
    For details, see: #3817
    
    ### Changes
    
    When there is an `auditTask` during the `lostBookieRecoveryDelay` delay, other detection tasks should be skipped.
---
 .../org/apache/bookkeeper/replication/Auditor.java |   9 +-
 .../replication/AuditorBookieCheckTask.java        |   9 +-
 .../replication/AuditorCheckAllLedgersTask.java    |  12 +-
 .../AuditorPlacementPolicyCheckTask.java           |  12 +-
 .../replication/AuditorReplicasCheckTask.java      |  11 +-
 .../apache/bookkeeper/replication/AuditorTask.java |  14 +-
 .../AuditorCheckAllLedgersTaskTest.java            |   2 +-
 .../replication/AuditorPeriodicCheckTest.java      | 229 +++++++++++++++++++--
 .../AuditorPlacementPolicyCheckTaskTest.java       |   2 +-
 .../AuditorPlacementPolicyCheckTest.java           |   2 +-
 .../replication/AuditorReplicasCheckTaskTest.java  |   2 +-
 .../replication/AuditorReplicasCheckTest.java      |   2 +-
 .../replication/TestReplicationWorker.java         |   2 +-
 13 files changed, 268 insertions(+), 40 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 4d7be7ace2..739a5ff7d7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -172,13 +172,16 @@ public class Auditor implements AutoCloseable {
                 bookieLedgerIndexer, hasAuditCheckTask, submitBookieCheckTask);
         allAuditorTasks.add(auditorBookieCheckTask);
         this.auditorCheckAllLedgersTask = new AuditorCheckAllLedgersTask(
-                conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+                conf, auditorStats, admin, ledgerManager,
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         allAuditorTasks.add(auditorCheckAllLedgersTask);
         this.auditorPlacementPolicyCheckTask = new AuditorPlacementPolicyCheckTask(
-                conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+                conf, auditorStats, admin, ledgerManager,
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         allAuditorTasks.add(auditorPlacementPolicyCheckTask);
         this.auditorReplicasCheckTask = new AuditorReplicasCheckTask(
-                conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+                conf, auditorStats, admin, ledgerManager,
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         allAuditorTasks.add(auditorReplicasCheckTask);
 
         executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
index 5a446e9187..35f479638b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
@@ -41,8 +41,6 @@ public class AuditorBookieCheckTask extends AuditorTask {
     private static final Logger LOG = LoggerFactory.getLogger(AuditorBookieCheckTask.class);
 
     private final BookieLedgerIndexer bookieLedgerIndexer;
-    private final BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask;
-    private final AtomicBoolean hasTask = new AtomicBoolean(false);
     private final BiConsumer<Void, Throwable> submitCheckTask;
 
     public AuditorBookieCheckTask(ServerConfiguration conf,
@@ -55,17 +53,14 @@ public class AuditorBookieCheckTask extends AuditorTask {
                                   BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask,
                                   BiConsumer<Void, Throwable> submitCheckTask) {
         super(conf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, shutdownTaskHandler);
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         this.bookieLedgerIndexer = bookieLedgerIndexer;
-        this.hasAuditCheckTask = hasAuditCheckTask;
         this.submitCheckTask = submitCheckTask;
     }
 
     @Override
     protected void runTask() {
-        hasTask.set(false);
-        hasAuditCheckTask.accept(hasTask, null);
-        if (!hasTask.get()) {
+        if (!hasBookieCheckTask()) {
             startAudit(true);
         } else {
             // if due to a lost bookie an audit task was scheduled,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
index bddef5a88f..73ca36cd75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -57,10 +59,11 @@ public class AuditorCheckAllLedgersTask extends AuditorTask {
                                BookKeeperAdmin admin,
                                LedgerManager ledgerManager,
                                LedgerUnderreplicationManager ledgerUnderreplicationManager,
-                               ShutdownTaskHandler shutdownTaskHandler)
+                               ShutdownTaskHandler shutdownTaskHandler,
+                               BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask)
             throws UnavailableException {
         super(conf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, shutdownTaskHandler);
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
 
         if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
             LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
@@ -89,6 +92,11 @@ public class AuditorCheckAllLedgersTask extends AuditorTask {
 
     @Override
     protected void runTask() {
+        if (hasBookieCheckTask()) {
+            LOG.info("Audit bookie task already scheduled; skipping periodic all ledgers check task");
+            return;
+        }
+
         Stopwatch stopwatch = Stopwatch.createStarted();
         boolean checkSuccess = false;
         try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
index 74442b655a..04fbc84f3e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
@@ -25,7 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import lombok.Getter;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -58,9 +60,10 @@ public class AuditorPlacementPolicyCheckTask extends AuditorTask {
                                     BookKeeperAdmin admin,
                                     LedgerManager ledgerManager,
                                     LedgerUnderreplicationManager ledgerUnderreplicationManager,
-                                    ShutdownTaskHandler shutdownTaskHandler) {
+                                    ShutdownTaskHandler shutdownTaskHandler,
+                                    BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
         super(conf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, shutdownTaskHandler);
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         this.underreplicatedLedgerRecoveryGracePeriod = conf.getUnderreplicatedLedgerRecoveryGracePeriod();
         this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new AtomicInteger(0);
         this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new AtomicInteger(0);
@@ -70,6 +73,11 @@ public class AuditorPlacementPolicyCheckTask extends AuditorTask {
 
     @Override
     protected void runTask() {
+        if (hasBookieCheckTask()) {
+            LOG.info("Audit bookie task already scheduled; skipping periodic placement policy check task");
+            return;
+        }
+
         try {
             if (!isLedgerReplicationEnabled()) {
                 LOG.info("Ledger replication disabled, skipping placementPolicyCheck");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
index 7ce66883b6..b9e47c838d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import org.apache.bookkeeper.client.BKException;
@@ -69,9 +70,10 @@ public class AuditorReplicasCheckTask extends AuditorTask {
                              AuditorStats auditorStats, BookKeeperAdmin admin,
                              LedgerManager ledgerManager,
                              LedgerUnderreplicationManager ledgerUnderreplicationManager,
-                             ShutdownTaskHandler shutdownTaskHandler) {
+                             ShutdownTaskHandler shutdownTaskHandler,
+                             BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
         super(conf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, shutdownTaskHandler);
+                ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
         this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
         this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
         this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new AtomicInteger(0);
@@ -80,6 +82,11 @@ public class AuditorReplicasCheckTask extends AuditorTask {
 
     @Override
     protected void runTask() {
+        if (hasBookieCheckTask()) {
+            LOG.info("Audit bookie task already scheduled; skipping periodic replicas check task");
+            return;
+        }
+
         try {
             if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
                 LOG.info("Ledger replication disabled, skipping replicasCheck task.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
index 3306f4d0cb..895cc70dea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
@@ -24,7 +24,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -46,19 +48,23 @@ abstract class AuditorTask implements Runnable {
     protected LedgerManager ledgerManager;
     protected LedgerUnderreplicationManager ledgerUnderreplicationManager;
     private final ShutdownTaskHandler shutdownTaskHandler;
+    private final BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask;
+    private final AtomicBoolean hasTask = new AtomicBoolean(false);
 
     AuditorTask(ServerConfiguration conf,
                 AuditorStats auditorStats,
                 BookKeeperAdmin admin,
                 LedgerManager ledgerManager,
                 LedgerUnderreplicationManager ledgerUnderreplicationManager,
-                ShutdownTaskHandler shutdownTaskHandler) {
+                ShutdownTaskHandler shutdownTaskHandler,
+                BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
         this.conf = conf;
         this.auditorStats = auditorStats;
         this.admin = admin;
         this.ledgerManager = ledgerManager;
         this.ledgerUnderreplicationManager = ledgerUnderreplicationManager;
         this.shutdownTaskHandler = shutdownTaskHandler;
+        this.hasAuditCheckTask = hasAuditCheckTask;
     }
 
     @Override
@@ -141,6 +147,12 @@ abstract class AuditorTask implements Runnable {
 
     public abstract void shutdown();
 
+    protected boolean hasBookieCheckTask() {
+        hasTask.set(false);
+        hasAuditCheckTask.accept(hasTask, null);
+        return hasTask.get();
+    }
+
     /**
      * ShutdownTaskHandler used to shutdown auditor executor.
      */
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
index 320f767282..02f585306f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
@@ -90,7 +90,7 @@ public class AuditorCheckAllLedgersTaskTest extends BookKeeperClusterTestCase {
 
         AuditorCheckAllLedgersTask auditorCheckAllLedgersTask = new AuditorCheckAllLedgersTask(
                 baseConf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, null);
+                ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
 
         // 3. checkAllLedgers
         auditorCheckAllLedgersTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 41095a9760..9257419c3a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -41,6 +41,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -72,6 +73,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -445,7 +447,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         servConf.setAuditorPeriodicBookieCheckInterval(0);
 
         final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
-                statsLogger);
+                statsLogger, null);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 0, checkAllLedgersStatsLogger.getSuccessCount());
         long curTimeBeforeStart = System.currentTimeMillis();
@@ -539,7 +541,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         servConf.setAuditorPeriodicBookieCheckInterval(0);
 
         final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
-                statsLogger);
+                statsLogger, null);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
         long curTimeBeforeStart = System.currentTimeMillis();
@@ -643,7 +645,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         servConf.setAuditorPeriodicCheckInterval(0);
         servConf.setAuditorPeriodicBookieCheckInterval(0);
         final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
-                statsLogger);
+                statsLogger, null);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount());
         long curTimeBeforeStart = System.currentTimeMillis();
@@ -699,35 +701,222 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         auditor.close();
     }
 
+    @Test
+    public void testDelayBookieAuditOfCheckAllLedgers() throws Exception {
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+
+        final int numLedgers = 10;
+        List<Long> ids = new LinkedList<Long>();
+        for (int i = 0; i < numLedgers; i++) {
+            LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+            ids.add(lh.getId());
+            for (int j = 0; j < 2; j++) {
+                lh.addEntry("testdata".getBytes());
+            }
+            lh.close();
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        Counter numBookieAuditsDelayed =
+                statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+        TestOpStatsLogger underReplicatedLedgerTotalSizeStatsLogger = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
+
+        servConf.setAuditorPeriodicCheckInterval(1);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+
+        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+        AtomicBoolean canRun = new AtomicBoolean(false);
+
+        final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+                false, statsLogger, canRun);
+        final CountDownLatch latch = auditor.getLatch();
+
+        auditor.start();
+
+        killBookie(addressByIndex(0));
+
+        Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+        final Future<?> auditTask = auditor.auditTask;
+        assertTrue(auditTask != null && !auditTask.isDone());
+
+        canRun.set(true);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertTrue(auditor.auditTask.equals(auditTask)
+                && auditor.auditTask != null && !auditor.auditTask.isDone());
+        // wrong num is numLedgers, right num is 0
+        assertEquals("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE",
+                0,
+                underReplicatedLedgerTotalSizeStatsLogger.getSuccessCount());
+
+        auditor.close();
+    }
+
+    @Test
+    public void testDelayBookieAuditOfPlacementPolicy() throws Exception {
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+
+        final int numLedgers = 10;
+        List<Long> ids = new LinkedList<Long>();
+        for (int i = 0; i < numLedgers; i++) {
+            LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+            ids.add(lh.getId());
+            for (int j = 0; j < 2; j++) {
+                lh.addEntry("testdata".getBytes());
+            }
+            lh.close();
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        Counter numBookieAuditsDelayed =
+                statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+        TestOpStatsLogger placementPolicyCheckTime = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+
+        servConf.setAuditorPeriodicCheckInterval(0);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1);
+        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+
+        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+        AtomicBoolean canRun = new AtomicBoolean(false);
+
+        final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+                false, statsLogger, canRun);
+        final CountDownLatch latch = auditor.getLatch();
+
+        auditor.start();
+
+        killBookie(addressByIndex(0));
+
+        Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+        final Future<?> auditTask = auditor.auditTask;
+        assertTrue(auditTask != null && !auditTask.isDone());
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0, placementPolicyCheckTime.getSuccessCount());
+
+        canRun.set(true);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertTrue(auditor.auditTask.equals(auditTask)
+                && auditor.auditTask != null && !auditor.auditTask.isDone());
+        // wrong successCount is > 0, right successCount is = 0
+        assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0, placementPolicyCheckTime.getSuccessCount());
+
+        auditor.close();
+    }
+
+    @Test
+    public void testDelayBookieAuditOfReplicasCheck() throws Exception {
+        for (AuditorElector e : auditorElectors.values()) {
+            e.shutdown();
+        }
+
+        final int numLedgers = 10;
+        List<Long> ids = new LinkedList<Long>();
+        for (int i = 0; i < numLedgers; i++) {
+            LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+            ids.add(lh.getId());
+            for (int j = 0; j < 2; j++) {
+                lh.addEntry("testdata".getBytes());
+            }
+            lh.close();
+        }
+
+        LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+        LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+        TestStatsProvider statsProvider = new TestStatsProvider();
+        TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+        Counter numBookieAuditsDelayed =
+                statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+        TestOpStatsLogger replicasCheckTime = (TestOpStatsLogger) statsLogger
+                .getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
+
+        servConf.setAuditorPeriodicCheckInterval(0);
+        servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
+        servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+        servConf.setAuditorPeriodicReplicasCheckInterval(1);
+
+        urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+        AtomicBoolean canRun = new AtomicBoolean(false);
+
+        final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+                false, statsLogger, canRun);
+        final CountDownLatch latch = auditor.getLatch();
+
+        auditor.start();
+
+        killBookie(addressByIndex(0));
+
+        Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+        final Future<?> auditTask = auditor.auditTask;
+        assertTrue(auditTask != null && !auditTask.isDone());
+        assertEquals("REPLICAS_CHECK_TIME", 0, replicasCheckTime.getSuccessCount());
+
+        canRun.set(true);
+
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertTrue(auditor.auditTask.equals(auditTask)
+                && auditor.auditTask != null && !auditor.auditTask.isDone());
+        // wrong successCount is > 0, right successCount is = 0
+        assertEquals("REPLICAS_CHECK_TIME", 0, replicasCheckTime.getSuccessCount());
+
+        auditor.close();
+    }
+
     static class TestAuditor extends Auditor {
 
         final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
 
         public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc,
-                StatsLogger statsLogger) throws UnavailableException {
+                StatsLogger statsLogger, AtomicBoolean exceptedRun) throws UnavailableException {
             super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
-            renewAuditorTestWrapperTask();
+            renewAuditorTestWrapperTask(exceptedRun);
         }
 
         public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc,
-                BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger) throws UnavailableException {
+                BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger,
+                           AtomicBoolean exceptedRun) throws UnavailableException {
             super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin, statsLogger);
-            renewAuditorTestWrapperTask();
+            renewAuditorTestWrapperTask(exceptedRun);
         }
 
-        public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger)
+        public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger,
+                           AtomicBoolean exceptedRun)
                 throws UnavailableException {
             super(bookieIdentifier, conf, statsLogger);
-            renewAuditorTestWrapperTask();
+            renewAuditorTestWrapperTask(exceptedRun);
         }
 
-        private void renewAuditorTestWrapperTask() {
+        private void renewAuditorTestWrapperTask(AtomicBoolean exceptedRun) {
             super.auditorCheckAllLedgersTask =
-                    new AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef);
+                    new AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef, exceptedRun);
             super.auditorPlacementPolicyCheckTask =
-                    new AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef);
+                    new AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef, exceptedRun);
             super.auditorReplicasCheckTask =
-                    new AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef);
+                    new AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef, exceptedRun);
         }
 
         CountDownLatch getLatch() {
@@ -741,18 +930,24 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         private static class AuditorTestWrapperTask extends AuditorTask {
             private final AuditorTask innerTask;
             private final AtomicReference<CountDownLatch> latchRef;
+            private final AtomicBoolean exceptedRun;
 
-            AuditorTestWrapperTask(AuditorTask innerTask, AtomicReference<CountDownLatch> latchRef) {
+            AuditorTestWrapperTask(AuditorTask innerTask,
+                                   AtomicReference<CountDownLatch> latchRef,
+                                   AtomicBoolean exceptedRun) {
                 super(null, null, null, null, null,
-                        null);
+                        null, null);
                 this.innerTask = innerTask;
                 this.latchRef = latchRef;
+                this.exceptedRun = exceptedRun;
             }
 
             @Override
             protected void runTask() {
-                innerTask.runTask();
-                latchRef.get().countDown();
+                if (exceptedRun == null || exceptedRun.get()) {
+                    innerTask.runTask();
+                    latchRef.get().countDown();
+                }
             }
 
             @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
index ec773efcf6..6d951d7180 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
@@ -86,7 +86,7 @@ public class AuditorPlacementPolicyCheckTaskTest extends BookKeeperClusterTestCa
 
         AuditorPlacementPolicyCheckTask auditorPlacementPolicyCheckTask = new AuditorPlacementPolicyCheckTask(
                 baseConf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, null);
+                ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
 
         // 3. placementPolicyCheck
         auditorPlacementPolicyCheckTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index b26084921d..e14deb3fb5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -831,7 +831,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
                 .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
 
         final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf,
-                statsLogger);
+                statsLogger, null);
         auditorRef.setValue(auditor);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
index 34c440b06c..f39a9dace7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
@@ -85,7 +85,7 @@ public class AuditorReplicasCheckTaskTest extends BookKeeperClusterTestCase {
         final AuditorStats auditorStats = new AuditorStats(statsLogger);
         AuditorReplicasCheckTask auditorReplicasCheckTask = new AuditorReplicasCheckTask(
                 baseConf, auditorStats, admin, ledgerManager,
-                ledgerUnderreplicationManager, null);
+                ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
 
         // 3. replicasCheck
         auditorReplicasCheckTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index a9377f394e..c64a14eca2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -155,7 +155,7 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
         final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, true,
                 new TestBookKeeperAdmin(bkc, statsLogger, expectedReturnAvailabilityOfEntriesOfLedger,
                         errorReturnValueForGetAvailabilityOfEntriesOfLedger),
-                true, statsLogger);
+                true, statsLogger, null);
         auditorRef.setValue(auditor);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index b7752b8bcb..fab068ae60 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -1374,7 +1374,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
 
         final AuditorPeriodicCheckTest.TestAuditor auditor = new AuditorPeriodicCheckTest.TestAuditor(
-                BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger);
+                BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger, null);
         auditorRef.setValue(auditor);
         CountDownLatch latch = auditor.getLatch();
         assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0,