You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/01 13:44:52 UTC

[bookkeeper] 06/17: Fix the infinite waiting for shutdown due to throttler limit (#2942)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 308646bbfd95c3858d352fbf9c3fc56ffe2eec68
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Jul 28 11:41:52 2022 +0800

    Fix the infinite waiting for shutdown due to throttler limit (#2942)
    
    Descriptions of the changes in this PR:
    
    If the compactor is limited, the shutdown priority should be higher than waiting for RateLimiter.acquire.
    
    According to @hangc0276 suggestion, when processing the shutdown logic of `GarbageCollectorThread`, we should check the status of the `newScanner.process` method. If the status is false, throw an `IOException` and stop compact immediately.
    
    Master Issue: #2941
    
    (cherry picked from commit 442e3bbad384fba9b09f8ea774e28562a8d5c6d7)
---
 .../bookkeeper/bookie/AbstractLogCompactor.java    | 32 ++++++++++++-
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  2 +
 .../apache/bookkeeper/bookie/CompactionTest.java   | 56 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
index 57ec8978cc..56d4ff5cb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -23,6 +23,9 @@ package org.apache.bookkeeper.bookie;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 
 /**
@@ -60,6 +63,7 @@ public abstract class AbstractLogCompactor {
     static class Throttler {
         private final RateLimiter rateLimiter;
         private final boolean isThrottleByBytes;
+        private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
         Throttler(ServerConfiguration conf) {
             this.isThrottleByBytes  = conf.getIsThrottleByBytes();
@@ -68,8 +72,32 @@ public abstract class AbstractLogCompactor {
         }
 
         // acquire. if bybytes: bytes of this entry; if byentries: 1.
-        void acquire(int permits) {
-            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+        boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
+            return rateLimiter.tryAcquire(this.isThrottleByBytes ? permits : 1, timeout, unit);
+        }
+
+        // GC thread will check the status for the rate limiter
+        // If the compactor is being stopped by other threads,
+        // and the GC thread is still limited, the compact task will be stopped.
+        public void acquire(int permits) throws IOException {
+            long timeout = 100;
+            long start = System.currentTimeMillis();
+            while (!tryAcquire(permits, timeout, TimeUnit.MILLISECONDS)) {
+                if (cancelled.get()) {
+                    throw new IOException("Failed to get permits takes "
+                            + (System.currentTimeMillis() - start)
+                            + " ms may be compactor has been shutting down");
+                }
+                try {
+                    TimeUnit.MILLISECONDS.sleep(timeout);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        public void cancelledAcquire() {
+            cancelled.set(true);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index b98cd2a8f0..6e3d00c3d4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -611,6 +611,8 @@ public class GarbageCollectorThread extends SafeRunnable {
         }
         LOG.info("Shutting down GarbageCollectorThread");
 
+        throttler.cancelledAcquire();
+        compactor.throttler.cancelledAcquire();
         while (!compacting.compareAndSet(false, true)) {
             // Wait till the thread stops compacting
             Thread.sleep(100);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 765d8256a7..d39e6bf134 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -1389,6 +1390,61 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         storage.getEntry(1, 1); // entry should exist
     }
 
+    @Test
+    public void testCancelledCompactionWhenShuttingDown() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        // change compaction in low throughput
+        // restart bookies
+        restartBookies(c -> {
+            c.setIsThrottleByBytes(true);
+            c.setCompactionRateByBytes(ENTRY_SIZE / 1000);
+            c.setMinorCompactionThreshold(0.2f);
+            c.setMajorCompactionThreshold(0.5f);
+            return c;
+        });
+
+        // remove ledger2 and ledger3
+        // so entry log 1 and 2 would have ledger1 entries left
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+
+        getGCThread().triggerGC(true, false, false);
+        getGCThread().throttler.cancelledAcquire();
+        waitUntilTrue(() -> {
+            try {
+                return getGCThread().compacting.get();
+            } catch (Exception e) {
+                fail("Get GC thread failed");
+            }
+            return null;
+        }, () -> "Not attempting to complete", 10000, 200);
+
+        getGCThread().shutdown();
+        // after garbage collection shutdown, compaction should be cancelled when acquire permits
+        // and GC running flag should be false.
+        assertFalse(getGCThread().running);
+
+    }
+
+    private void waitUntilTrue(Supplier<Boolean> condition,
+                               Supplier<String> msg,
+                               long waitTime,
+                               long pause) throws InterruptedException {
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            if (condition.get()) {
+                return;
+            }
+            if (System.currentTimeMillis() > startTime + waitTime) {
+                fail(msg.get());
+            }
+            Thread.sleep(Math.min(waitTime, pause));
+        }
+    }
+
     private LedgerManager getLedgerManager(final Set<Long> ledgers) {
         LedgerManager manager = new LedgerManager() {
                 @Override