You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2023/03/16 04:21:34 UTC
[bookkeeper] branch master updated: [improvement] Delay all audit task when have a already delayed bookie check task (#3818)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new ed542b9636 [improvement] Delay all audit task when have a already delayed bookie check task (#3818)
ed542b9636 is described below
commit ed542b963621907361ee518ea5fc3c27cb3aa41c
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Mar 16 12:21:28 2023 +0800
[improvement] Delay all audit task when have a already delayed bookie check task (#3818)
### Motivation
Fixes #3817
For details, see: #3817
### Changes
When there is an `auditTask` during the `lostBookieRecoveryDelay` delay, other detection tasks should be skipped.
---
.../org/apache/bookkeeper/replication/Auditor.java | 9 +-
.../replication/AuditorBookieCheckTask.java | 9 +-
.../replication/AuditorCheckAllLedgersTask.java | 12 +-
.../AuditorPlacementPolicyCheckTask.java | 12 +-
.../replication/AuditorReplicasCheckTask.java | 11 +-
.../apache/bookkeeper/replication/AuditorTask.java | 14 +-
.../AuditorCheckAllLedgersTaskTest.java | 2 +-
.../replication/AuditorPeriodicCheckTest.java | 229 +++++++++++++++++++--
.../AuditorPlacementPolicyCheckTaskTest.java | 2 +-
.../AuditorPlacementPolicyCheckTest.java | 2 +-
.../replication/AuditorReplicasCheckTaskTest.java | 2 +-
.../replication/AuditorReplicasCheckTest.java | 2 +-
.../replication/TestReplicationWorker.java | 2 +-
13 files changed, 268 insertions(+), 40 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 4d7be7ace2..739a5ff7d7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -172,13 +172,16 @@ public class Auditor implements AutoCloseable {
bookieLedgerIndexer, hasAuditCheckTask, submitBookieCheckTask);
allAuditorTasks.add(auditorBookieCheckTask);
this.auditorCheckAllLedgersTask = new AuditorCheckAllLedgersTask(
- conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+ conf, auditorStats, admin, ledgerManager,
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
allAuditorTasks.add(auditorCheckAllLedgersTask);
this.auditorPlacementPolicyCheckTask = new AuditorPlacementPolicyCheckTask(
- conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+ conf, auditorStats, admin, ledgerManager,
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
allAuditorTasks.add(auditorPlacementPolicyCheckTask);
this.auditorReplicasCheckTask = new AuditorReplicasCheckTask(
- conf, auditorStats, admin, ledgerManager, ledgerUnderreplicationManager, shutdownTaskHandler);
+ conf, auditorStats, admin, ledgerManager,
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
allAuditorTasks.add(auditorReplicasCheckTask);
executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
index 5a446e9187..35f479638b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorBookieCheckTask.java
@@ -41,8 +41,6 @@ public class AuditorBookieCheckTask extends AuditorTask {
private static final Logger LOG = LoggerFactory.getLogger(AuditorBookieCheckTask.class);
private final BookieLedgerIndexer bookieLedgerIndexer;
- private final BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask;
- private final AtomicBoolean hasTask = new AtomicBoolean(false);
private final BiConsumer<Void, Throwable> submitCheckTask;
public AuditorBookieCheckTask(ServerConfiguration conf,
@@ -55,17 +53,14 @@ public class AuditorBookieCheckTask extends AuditorTask {
BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask,
BiConsumer<Void, Throwable> submitCheckTask) {
super(conf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, shutdownTaskHandler);
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
this.bookieLedgerIndexer = bookieLedgerIndexer;
- this.hasAuditCheckTask = hasAuditCheckTask;
this.submitCheckTask = submitCheckTask;
}
@Override
protected void runTask() {
- hasTask.set(false);
- hasAuditCheckTask.accept(hasTask, null);
- if (!hasTask.get()) {
+ if (!hasBookieCheckTask()) {
startAudit(true);
} else {
// if due to a lost bookie an audit task was scheduled,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
index bddef5a88f..73ca36cd75 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTask.java
@@ -27,6 +27,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -57,10 +59,11 @@ public class AuditorCheckAllLedgersTask extends AuditorTask {
BookKeeperAdmin admin,
LedgerManager ledgerManager,
LedgerUnderreplicationManager ledgerUnderreplicationManager,
- ShutdownTaskHandler shutdownTaskHandler)
+ ShutdownTaskHandler shutdownTaskHandler,
+ BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask)
throws UnavailableException {
super(conf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, shutdownTaskHandler);
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should be greater than 0");
@@ -89,6 +92,11 @@ public class AuditorCheckAllLedgersTask extends AuditorTask {
@Override
protected void runTask() {
+ if (hasBookieCheckTask()) {
+ LOG.info("Audit bookie task already scheduled; skipping periodic all ledgers check task");
+ return;
+ }
+
Stopwatch stopwatch = Stopwatch.createStarted();
boolean checkSuccess = false;
try {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
index 74442b655a..04fbc84f3e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTask.java
@@ -25,7 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import lombok.Getter;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -58,9 +60,10 @@ public class AuditorPlacementPolicyCheckTask extends AuditorTask {
BookKeeperAdmin admin,
LedgerManager ledgerManager,
LedgerUnderreplicationManager ledgerUnderreplicationManager,
- ShutdownTaskHandler shutdownTaskHandler) {
+ ShutdownTaskHandler shutdownTaskHandler,
+ BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
super(conf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, shutdownTaskHandler);
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
this.underreplicatedLedgerRecoveryGracePeriod = conf.getUnderreplicatedLedgerRecoveryGracePeriod();
this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new AtomicInteger(0);
this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new AtomicInteger(0);
@@ -70,6 +73,11 @@ public class AuditorPlacementPolicyCheckTask extends AuditorTask {
@Override
protected void runTask() {
+ if (hasBookieCheckTask()) {
+ LOG.info("Audit bookie task already scheduled; skipping periodic placement policy check task");
+ return;
+ }
+
try {
if (!isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping placementPolicyCheck");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
index 7ce66883b6..b9e47c838d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTask.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BKException;
@@ -69,9 +70,10 @@ public class AuditorReplicasCheckTask extends AuditorTask {
AuditorStats auditorStats, BookKeeperAdmin admin,
LedgerManager ledgerManager,
LedgerUnderreplicationManager ledgerUnderreplicationManager,
- ShutdownTaskHandler shutdownTaskHandler) {
+ ShutdownTaskHandler shutdownTaskHandler,
+ BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
super(conf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, shutdownTaskHandler);
+ ledgerUnderreplicationManager, shutdownTaskHandler, hasAuditCheckTask);
this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new AtomicInteger(0);
@@ -80,6 +82,11 @@ public class AuditorReplicasCheckTask extends AuditorTask {
@Override
protected void runTask() {
+ if (hasBookieCheckTask()) {
+ LOG.info("Audit bookie task already scheduled; skipping periodic replicas check task");
+ return;
+ }
+
try {
if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
LOG.info("Ledger replication disabled, skipping replicasCheck task.");
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
index 3306f4d0cb..895cc70dea 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorTask.java
@@ -24,7 +24,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -46,19 +48,23 @@ abstract class AuditorTask implements Runnable {
protected LedgerManager ledgerManager;
protected LedgerUnderreplicationManager ledgerUnderreplicationManager;
private final ShutdownTaskHandler shutdownTaskHandler;
+ private final BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask;
+ private final AtomicBoolean hasTask = new AtomicBoolean(false);
AuditorTask(ServerConfiguration conf,
AuditorStats auditorStats,
BookKeeperAdmin admin,
LedgerManager ledgerManager,
LedgerUnderreplicationManager ledgerUnderreplicationManager,
- ShutdownTaskHandler shutdownTaskHandler) {
+ ShutdownTaskHandler shutdownTaskHandler,
+ BiConsumer<AtomicBoolean, Throwable> hasAuditCheckTask) {
this.conf = conf;
this.auditorStats = auditorStats;
this.admin = admin;
this.ledgerManager = ledgerManager;
this.ledgerUnderreplicationManager = ledgerUnderreplicationManager;
this.shutdownTaskHandler = shutdownTaskHandler;
+ this.hasAuditCheckTask = hasAuditCheckTask;
}
@Override
@@ -141,6 +147,12 @@ abstract class AuditorTask implements Runnable {
public abstract void shutdown();
+ protected boolean hasBookieCheckTask() {
+ hasTask.set(false);
+ hasAuditCheckTask.accept(hasTask, null);
+ return hasTask.get();
+ }
+
/**
* ShutdownTaskHandler used to shutdown auditor executor.
*/
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
index 320f767282..02f585306f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorCheckAllLedgersTaskTest.java
@@ -90,7 +90,7 @@ public class AuditorCheckAllLedgersTaskTest extends BookKeeperClusterTestCase {
AuditorCheckAllLedgersTask auditorCheckAllLedgersTask = new AuditorCheckAllLedgersTask(
baseConf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, null);
+ ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
// 3. checkAllLedgers
auditorCheckAllLedgersTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 41095a9760..9257419c3a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -41,6 +41,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,6 +73,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -445,7 +447,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
servConf.setAuditorPeriodicBookieCheckInterval(0);
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
- statsLogger);
+ statsLogger, null);
CountDownLatch latch = auditor.getLatch();
assertEquals("CHECK_ALL_LEDGERS_TIME SuccessCount", 0, checkAllLedgersStatsLogger.getSuccessCount());
long curTimeBeforeStart = System.currentTimeMillis();
@@ -539,7 +541,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
servConf.setAuditorPeriodicBookieCheckInterval(0);
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
- statsLogger);
+ statsLogger, null);
CountDownLatch latch = auditor.getLatch();
assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
long curTimeBeforeStart = System.currentTimeMillis();
@@ -643,7 +645,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
servConf.setAuditorPeriodicCheckInterval(0);
servConf.setAuditorPeriodicBookieCheckInterval(0);
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false,
- statsLogger);
+ statsLogger, null);
CountDownLatch latch = auditor.getLatch();
assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount());
long curTimeBeforeStart = System.currentTimeMillis();
@@ -699,35 +701,222 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
auditor.close();
}
+ @Test
+ public void testDelayBookieAuditOfCheckAllLedgers() throws Exception {
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+
+ final int numLedgers = 10;
+ List<Long> ids = new LinkedList<Long>();
+ for (int i = 0; i < numLedgers; i++) {
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+ ids.add(lh.getId());
+ for (int j = 0; j < 2; j++) {
+ lh.addEntry("testdata".getBytes());
+ }
+ lh.close();
+ }
+
+ LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+ LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+ ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+ Counter numBookieAuditsDelayed =
+ statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+ TestOpStatsLogger underReplicatedLedgerTotalSizeStatsLogger = (TestOpStatsLogger) statsLogger
+ .getOpStatsLogger(ReplicationStats.UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
+
+ servConf.setAuditorPeriodicCheckInterval(1);
+ servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
+ servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+
+ urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+ AtomicBoolean canRun = new AtomicBoolean(false);
+
+ final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+ false, statsLogger, canRun);
+ final CountDownLatch latch = auditor.getLatch();
+
+ auditor.start();
+
+ killBookie(addressByIndex(0));
+
+ Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+ final Future<?> auditTask = auditor.auditTask;
+ assertTrue(auditTask != null && !auditTask.isDone());
+
+ canRun.set(true);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(auditor.auditTask.equals(auditTask)
+ && auditor.auditTask != null && !auditor.auditTask.isDone());
+ // wrong num is numLedgers, right num is 0
+ assertEquals("UNDER_REPLICATED_LEDGERS_TOTAL_SIZE",
+ 0,
+ underReplicatedLedgerTotalSizeStatsLogger.getSuccessCount());
+
+ auditor.close();
+ }
+
+ @Test
+ public void testDelayBookieAuditOfPlacementPolicy() throws Exception {
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+
+ final int numLedgers = 10;
+ List<Long> ids = new LinkedList<Long>();
+ for (int i = 0; i < numLedgers; i++) {
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+ ids.add(lh.getId());
+ for (int j = 0; j < 2; j++) {
+ lh.addEntry("testdata".getBytes());
+ }
+ lh.close();
+ }
+
+ LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+ LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+ ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+ Counter numBookieAuditsDelayed =
+ statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+ TestOpStatsLogger placementPolicyCheckTime = (TestOpStatsLogger) statsLogger
+ .getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
+
+ servConf.setAuditorPeriodicCheckInterval(0);
+ servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1);
+ servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+
+ urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+ AtomicBoolean canRun = new AtomicBoolean(false);
+
+ final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+ false, statsLogger, canRun);
+ final CountDownLatch latch = auditor.getLatch();
+
+ auditor.start();
+
+ killBookie(addressByIndex(0));
+
+ Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+ final Future<?> auditTask = auditor.auditTask;
+ assertTrue(auditTask != null && !auditTask.isDone());
+ assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0, placementPolicyCheckTime.getSuccessCount());
+
+ canRun.set(true);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(auditor.auditTask.equals(auditTask)
+ && auditor.auditTask != null && !auditor.auditTask.isDone());
+ // wrong successCount is > 0, right successCount is = 0
+ assertEquals("PLACEMENT_POLICY_CHECK_TIME", 0, placementPolicyCheckTime.getSuccessCount());
+
+ auditor.close();
+ }
+
+ @Test
+ public void testDelayBookieAuditOfReplicasCheck() throws Exception {
+ for (AuditorElector e : auditorElectors.values()) {
+ e.shutdown();
+ }
+
+ final int numLedgers = 10;
+ List<Long> ids = new LinkedList<Long>();
+ for (int i = 0; i < numLedgers; i++) {
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+ ids.add(lh.getId());
+ for (int j = 0; j < 2; j++) {
+ lh.addEntry("testdata".getBytes());
+ }
+ lh.close();
+ }
+
+ LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
+ LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
+
+ ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
+
+ TestStatsProvider statsProvider = new TestStatsProvider();
+ TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
+ Counter numBookieAuditsDelayed =
+ statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
+ TestOpStatsLogger replicasCheckTime = (TestOpStatsLogger) statsLogger
+ .getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
+
+ servConf.setAuditorPeriodicCheckInterval(0);
+ servConf.setAuditorPeriodicPlacementPolicyCheckInterval(0);
+ servConf.setAuditorPeriodicBookieCheckInterval(Long.MAX_VALUE);
+ servConf.setAuditorPeriodicReplicasCheckInterval(1);
+
+ urm.setLostBookieRecoveryDelay(Integer.MAX_VALUE);
+
+ AtomicBoolean canRun = new AtomicBoolean(false);
+
+ final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc,
+ false, statsLogger, canRun);
+ final CountDownLatch latch = auditor.getLatch();
+
+ auditor.start();
+
+ killBookie(addressByIndex(0));
+
+ Awaitility.await().untilAsserted(() -> assertEquals(1, (long) numBookieAuditsDelayed.get()));
+ final Future<?> auditTask = auditor.auditTask;
+ assertTrue(auditTask != null && !auditTask.isDone());
+ assertEquals("REPLICAS_CHECK_TIME", 0, replicasCheckTime.getSuccessCount());
+
+ canRun.set(true);
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(auditor.auditTask.equals(auditTask)
+ && auditor.auditTask != null && !auditor.auditTask.isDone());
+ // wrong successCount is > 0, right successCount is = 0
+ assertEquals("REPLICAS_CHECK_TIME", 0, replicasCheckTime.getSuccessCount());
+
+ auditor.close();
+ }
+
static class TestAuditor extends Auditor {
final AtomicReference<CountDownLatch> latchRef = new AtomicReference<CountDownLatch>(new CountDownLatch(1));
public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc,
- StatsLogger statsLogger) throws UnavailableException {
+ StatsLogger statsLogger, AtomicBoolean exceptedRun) throws UnavailableException {
super(bookieIdentifier, conf, bkc, ownBkc, statsLogger);
- renewAuditorTestWrapperTask();
+ renewAuditorTestWrapperTask(exceptedRun);
}
public TestAuditor(String bookieIdentifier, ServerConfiguration conf, BookKeeper bkc, boolean ownBkc,
- BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger) throws UnavailableException {
+ BookKeeperAdmin bkadmin, boolean ownadmin, StatsLogger statsLogger,
+ AtomicBoolean exceptedRun) throws UnavailableException {
super(bookieIdentifier, conf, bkc, ownBkc, bkadmin, ownadmin, statsLogger);
- renewAuditorTestWrapperTask();
+ renewAuditorTestWrapperTask(exceptedRun);
}
- public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger)
+ public TestAuditor(final String bookieIdentifier, ServerConfiguration conf, StatsLogger statsLogger,
+ AtomicBoolean exceptedRun)
throws UnavailableException {
super(bookieIdentifier, conf, statsLogger);
- renewAuditorTestWrapperTask();
+ renewAuditorTestWrapperTask(exceptedRun);
}
- private void renewAuditorTestWrapperTask() {
+ private void renewAuditorTestWrapperTask(AtomicBoolean exceptedRun) {
super.auditorCheckAllLedgersTask =
- new AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef);
+ new AuditorTestWrapperTask(super.auditorCheckAllLedgersTask, latchRef, exceptedRun);
super.auditorPlacementPolicyCheckTask =
- new AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef);
+ new AuditorTestWrapperTask(super.auditorPlacementPolicyCheckTask, latchRef, exceptedRun);
super.auditorReplicasCheckTask =
- new AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef);
+ new AuditorTestWrapperTask(super.auditorReplicasCheckTask, latchRef, exceptedRun);
}
CountDownLatch getLatch() {
@@ -741,18 +930,24 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
private static class AuditorTestWrapperTask extends AuditorTask {
private final AuditorTask innerTask;
private final AtomicReference<CountDownLatch> latchRef;
+ private final AtomicBoolean exceptedRun;
- AuditorTestWrapperTask(AuditorTask innerTask, AtomicReference<CountDownLatch> latchRef) {
+ AuditorTestWrapperTask(AuditorTask innerTask,
+ AtomicReference<CountDownLatch> latchRef,
+ AtomicBoolean exceptedRun) {
super(null, null, null, null, null,
- null);
+ null, null);
this.innerTask = innerTask;
this.latchRef = latchRef;
+ this.exceptedRun = exceptedRun;
}
@Override
protected void runTask() {
- innerTask.runTask();
- latchRef.get().countDown();
+ if (exceptedRun == null || exceptedRun.get()) {
+ innerTask.runTask();
+ latchRef.get().countDown();
+ }
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
index ec773efcf6..6d951d7180 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTaskTest.java
@@ -86,7 +86,7 @@ public class AuditorPlacementPolicyCheckTaskTest extends BookKeeperClusterTestCa
AuditorPlacementPolicyCheckTask auditorPlacementPolicyCheckTask = new AuditorPlacementPolicyCheckTask(
baseConf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, null);
+ ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
// 3. placementPolicyCheck
auditorPlacementPolicyCheckTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index b26084921d..e14deb3fb5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -831,7 +831,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf,
- statsLogger);
+ statsLogger, null);
auditorRef.setValue(auditor);
CountDownLatch latch = auditor.getLatch();
assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
index 34c440b06c..f39a9dace7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTaskTest.java
@@ -85,7 +85,7 @@ public class AuditorReplicasCheckTaskTest extends BookKeeperClusterTestCase {
final AuditorStats auditorStats = new AuditorStats(statsLogger);
AuditorReplicasCheckTask auditorReplicasCheckTask = new AuditorReplicasCheckTask(
baseConf, auditorStats, admin, ledgerManager,
- ledgerUnderreplicationManager, null);
+ ledgerUnderreplicationManager, null, (flag, throwable) -> flag.set(false));
// 3. replicasCheck
auditorReplicasCheckTask.runTask();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index a9377f394e..c64a14eca2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -155,7 +155,7 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf, bkc, true,
new TestBookKeeperAdmin(bkc, statsLogger, expectedReturnAvailabilityOfEntriesOfLedger,
errorReturnValueForGetAvailabilityOfEntriesOfLedger),
- true, statsLogger);
+ true, statsLogger, null);
auditorRef.setValue(auditor);
CountDownLatch latch = auditor.getLatch();
assertEquals("REPLICAS_CHECK_TIME SuccessCount", 0, replicasCheckStatsLogger.getSuccessCount());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index b7752b8bcb..fab068ae60 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -1374,7 +1374,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
final AuditorPeriodicCheckTest.TestAuditor auditor = new AuditorPeriodicCheckTest.TestAuditor(
- BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger);
+ BookieImpl.getBookieId(servConf).toString(), servConf, bkc, false, statsLogger, null);
auditorRef.setValue(auditor);
CountDownLatch latch = auditor.getLatch();
assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0,