You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/01 13:44:52 UTC
[bookkeeper] 06/17: Fix the infinite waiting for shutdown due to throttler limit (#2942)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 308646bbfd95c3858d352fbf9c3fc56ffe2eec68
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Jul 28 11:41:52 2022 +0800
Fix the infinite waiting for shutdown due to throttler limit (#2942)
Descriptions of the changes in this PR:
If the compactor is limited, the shutdown priority should be higher than waiting for RateLimiter.acquire.
According to @hangc0276 suggestion, when processing the shutdown logic of `GarbageCollectorThread`, we should check the status of the `newScanner.process` method. If the status is false, throw an `IOException` and stop compact immediately.
Master Issue: #2941
(cherry picked from commit 442e3bbad384fba9b09f8ea774e28562a8d5c6d7)
---
.../bookkeeper/bookie/AbstractLogCompactor.java | 32 ++++++++++++-
.../bookkeeper/bookie/GarbageCollectorThread.java | 2 +
.../apache/bookkeeper/bookie/CompactionTest.java | 56 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
index 57ec8978cc..56d4ff5cb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -23,6 +23,9 @@ package org.apache.bookkeeper.bookie;
import com.google.common.util.concurrent.RateLimiter;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.conf.ServerConfiguration;
/**
@@ -60,6 +63,7 @@ public abstract class AbstractLogCompactor {
static class Throttler {
private final RateLimiter rateLimiter;
private final boolean isThrottleByBytes;
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
Throttler(ServerConfiguration conf) {
this.isThrottleByBytes = conf.getIsThrottleByBytes();
@@ -68,8 +72,32 @@ public abstract class AbstractLogCompactor {
}
// acquire. if bybytes: bytes of this entry; if byentries: 1.
- void acquire(int permits) {
- rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+ boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
+ return rateLimiter.tryAcquire(this.isThrottleByBytes ? permits : 1, timeout, unit);
+ }
+
+ // GC thread will check the status for the rate limiter
+ // If the compactor is being stopped by other threads,
+ // and the GC thread is still limited, the compact task will be stopped.
+ public void acquire(int permits) throws IOException {
+ long timeout = 100;
+ long start = System.currentTimeMillis();
+ while (!tryAcquire(permits, timeout, TimeUnit.MILLISECONDS)) {
+ if (cancelled.get()) {
+ throw new IOException("Failed to get permits takes "
+ + (System.currentTimeMillis() - start)
+ + " ms may be compactor has been shutting down");
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(timeout);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public void cancelledAcquire() {
+ cancelled.set(true);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index b98cd2a8f0..6e3d00c3d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -611,6 +611,8 @@ public class GarbageCollectorThread extends SafeRunnable {
}
LOG.info("Shutting down GarbageCollectorThread");
+ throttler.cancelledAcquire();
+ compactor.throttler.cancelledAcquire();
while (!compacting.compareAndSet(false, true)) {
// Wait till the thread stops compacting
Thread.sleep(100);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 765d8256a7..d39e6bf134 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -1389,6 +1390,61 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
storage.getEntry(1, 1); // entry should exist
}
+ @Test
+ public void testCancelledCompactionWhenShuttingDown() throws Exception {
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ // change compaction in low throughput
+ // restart bookies
+ restartBookies(c -> {
+ c.setIsThrottleByBytes(true);
+ c.setCompactionRateByBytes(ENTRY_SIZE / 1000);
+ c.setMinorCompactionThreshold(0.2f);
+ c.setMajorCompactionThreshold(0.5f);
+ return c;
+ });
+
+ // remove ledger2 and ledger3
+ // so entry log 1 and 2 would have ledger1 entries left
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+ LOG.info("Finished deleting the ledgers contains most entries.");
+
+ getGCThread().triggerGC(true, false, false);
+ getGCThread().throttler.cancelledAcquire();
+ waitUntilTrue(() -> {
+ try {
+ return getGCThread().compacting.get();
+ } catch (Exception e) {
+ fail("Get GC thread failed");
+ }
+ return null;
+ }, () -> "Not attempting to complete", 10000, 200);
+
+ getGCThread().shutdown();
+ // after garbage collection shutdown, compaction should be cancelled when acquire permits
+ // and GC running flag should be false.
+ assertFalse(getGCThread().running);
+
+ }
+
+ private void waitUntilTrue(Supplier<Boolean> condition,
+ Supplier<String> msg,
+ long waitTime,
+ long pause) throws InterruptedException {
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ if (condition.get()) {
+ return;
+ }
+ if (System.currentTimeMillis() > startTime + waitTime) {
+ fail(msg.get());
+ }
+ Thread.sleep(Math.min(waitTime, pause));
+ }
+ }
+
private LedgerManager getLedgerManager(final Set<Long> ledgers) {
LedgerManager manager = new LedgerManager() {
@Override