You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/07/28 03:41:58 UTC

[bookkeeper] branch master updated: Fix the infinite waiting for shutdown due to throttler limit (#2942)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 442e3bbad3 Fix the infinite waiting for shutdown due to throttler limit (#2942)
442e3bbad3 is described below

commit 442e3bbad384fba9b09f8ea774e28562a8d5c6d7
Author: wenbingshen <ol...@gmail.com>
AuthorDate: Thu Jul 28 11:41:52 2022 +0800

    Fix the infinite waiting for shutdown due to throttler limit (#2942)
    
    Descriptions of the changes in this PR:
    ### Motivation
    
    If the compactor is limited, the shutdown priority should be higher than waiting for RateLimiter.acquire.
    
    ### Changes
    
    According to @hangc0276 suggestion, when processing the shutdown logic of `GarbageCollectorThread`, we should check the status of the `newScanner.process` method. If the status is false, throw an `IOException` and stop compact immediately.
    
    
    Master Issue: #2941
---
 .../bookkeeper/bookie/AbstractLogCompactor.java    | 32 ++++++++++++-
 .../bookkeeper/bookie/GarbageCollectorThread.java  |  2 +
 .../apache/bookkeeper/bookie/CompactionTest.java   | 56 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
index ff35f570ef..c69250534e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/AbstractLogCompactor.java
@@ -23,6 +23,9 @@ package org.apache.bookkeeper.bookie;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 
 /**
@@ -66,6 +69,7 @@ public abstract class AbstractLogCompactor {
     public static class Throttler {
         private final RateLimiter rateLimiter;
         private final boolean isThrottleByBytes;
+        private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
         Throttler(ServerConfiguration conf) {
             this.isThrottleByBytes  = conf.getIsThrottleByBytes();
@@ -74,8 +78,32 @@ public abstract class AbstractLogCompactor {
         }
 
         // acquire. if bybytes: bytes of this entry; if byentries: 1.
-        public void acquire(int permits) {
-            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+        boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
+            return rateLimiter.tryAcquire(this.isThrottleByBytes ? permits : 1, timeout, unit);
+        }
+
+        // GC thread will check the status for the rate limiter
+        // If the compactor is being stopped by other threads,
+        // and the GC thread is still limited, the compact task will be stopped.
+        public void acquire(int permits) throws IOException {
+            long timeout = 100;
+            long start = System.currentTimeMillis();
+            while (!tryAcquire(permits, timeout, TimeUnit.MILLISECONDS)) {
+                if (cancelled.get()) {
+                    throw new IOException("Failed to get permits takes "
+                            + (System.currentTimeMillis() - start)
+                            + " ms may be compactor has been shutting down");
+                }
+                try {
+                    TimeUnit.MILLISECONDS.sleep(timeout);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        public void cancelledAcquire() {
+            cancelled.set(true);
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index e6e3a6c4c3..e66cb41db1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -653,6 +653,8 @@ public class GarbageCollectorThread extends SafeRunnable {
         }
         LOG.info("Shutting down GarbageCollectorThread");
 
+        throttler.cancelledAcquire();
+        compactor.throttler.cancelledAcquire();
         while (!compacting.compareAndSet(false, true)) {
             // Wait till the thread stops compacting
             Thread.sleep(100);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 99fdabe23d..11e3745072 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.apache.bookkeeper.bookie.BookieException.EntryLogMetadataMapException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -1390,6 +1391,61 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         storage.getEntry(1, 1); // entry should exist
     }
 
+    @Test
+    public void testCancelledCompactionWhenShuttingDown() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        // change compaction in low throughput
+        // restart bookies
+        restartBookies(c -> {
+            c.setIsThrottleByBytes(true);
+            c.setCompactionRateByBytes(ENTRY_SIZE / 1000);
+            c.setMinorCompactionThreshold(0.2f);
+            c.setMajorCompactionThreshold(0.5f);
+            return c;
+        });
+
+        // remove ledger2 and ledger3
+        // so entry log 1 and 2 would have ledger1 entries left
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+        LOG.info("Finished deleting the ledgers contains most entries.");
+
+        getGCThread().triggerGC(true, false, false);
+        getGCThread().throttler.cancelledAcquire();
+        waitUntilTrue(() -> {
+            try {
+                return getGCThread().compacting.get();
+            } catch (Exception e) {
+                fail("Get GC thread failed");
+            }
+            return null;
+        }, () -> "Not attempting to complete", 10000, 200);
+
+        getGCThread().shutdown();
+        // after garbage collection shutdown, compaction should be cancelled when acquire permits
+        // and GC running flag should be false.
+        assertFalse(getGCThread().running);
+
+    }
+
+    private void waitUntilTrue(Supplier<Boolean> condition,
+                               Supplier<String> msg,
+                               long waitTime,
+                               long pause) throws InterruptedException {
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            if (condition.get()) {
+                return;
+            }
+            if (System.currentTimeMillis() > startTime + waitTime) {
+                fail(msg.get());
+            }
+            Thread.sleep(Math.min(waitTime, pause));
+        }
+    }
+
     private LedgerManager getLedgerManager(final Set<Long> ledgers) {
         LedgerManager manager = new LedgerManager() {
                 @Override