You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2021/10/11 13:35:11 UTC

[bookkeeper] branch master updated: Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new e413c70  Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779)
e413c70 is described below

commit e413c7094f5a4ba7cde3b2e399f8ebe4366b464c
Author: Raúl Gracia <ra...@emc.com>
AuthorDate: Mon Oct 11 15:35:02 2021 +0200

    Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779)
---
 .../org/apache/bookkeeper/bookie/EntryLogger.java  | 30 ++++++++++
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 28 ++++++---
 .../apache/bookkeeper/bookie/CompactionTest.java   | 70 ++++++++++++++++++++++
 .../java/org/apache/bookkeeper/util/TestUtils.java | 36 +++++++----
 4 files changed, 147 insertions(+), 17 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 504adfa..49a9ca4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -482,6 +482,27 @@ public class EntryLogger {
         return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
     }
 
+    /**
+     * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
+     * process needs to look beyond the least unflushed entry log file, as there may be entry logs
+     * ready to be garbage collected.
+     *
+     * @return last entry log id created.
+     */
+    long getLastLogId() {
+        return recentlyCreatedEntryLogsStatus.getLastLogId();
+    }
+
+    /**
+     * Returns whether the current log id exists and has been rotated already.
+     *
+     * @param entryLogId EntryLog id to check.
+     * @return Whether the given entryLogId exists and has been rotated.
+     */
+    boolean isFlushedEntryLog(Long entryLogId) {
+        return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
+    }
+
     long getPreviousAllocatedEntryLogId() {
         return entryLoggerAllocator.getPreallocatedLogId();
     }
@@ -1249,5 +1270,14 @@ public class EntryLogger {
         synchronized long getLeastUnflushedLogId() {
             return leastUnflushedLogId;
         }
+
+        synchronized long getLastLogId() {
+            return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
+        }
+
+        synchronized boolean isFlushedEntryLog(Long entryLogId) {
+            return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
+                    || entryLogId < leastUnflushedLogId;
+        }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index fb54890..cafbf53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 
 import lombok.Getter;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
@@ -583,12 +584,15 @@ public class GarbageCollectorThread extends SafeRunnable {
      * @throws IOException
      */
     protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
-        // Extract it for every entry log except for the current one.
-        // Entry Log ID's are just a long value that starts at 0 and increments
-        // by 1 when the log fills up and we roll to a new one.
-        long curLogId = entryLogger.getLeastUnflushedLogId();
+        // Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
+        // to a new one. We scan entry logs as follows:
+        // - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
+        // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
+        Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
+                entryLogger.getLeastUnflushedLogId();
         boolean hasExceptionWhenScan = false;
-        for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
+        boolean increaseScannedLogId = true;
+        for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
             // Comb the current entry log file if it has not already been extracted.
             if (entryLogMetaMap.containsKey(entryLogId)) {
                 continue;
@@ -600,6 +604,15 @@ public class GarbageCollectorThread extends SafeRunnable {
                 continue;
             }
 
+            // If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
+            // that have been explicitly rotated or below getLeastUnflushedLogId().
+            if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
+                LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
+                        entryLogId);
+                increaseScannedLogId = false;
+                continue;
+            }
+
             LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);
 
             try {
@@ -619,8 +632,9 @@ public class GarbageCollectorThread extends SafeRunnable {
 
             // if scan failed on some entry log, we don't move 'scannedLogId' to next id
             // if scan succeed, we don't need to scan it again during next gc run,
-            // we move 'scannedLogId' to next id
-            if (!hasExceptionWhenScan) {
+            // we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
+            // and we have found and un-flushed entry log already).
+            if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
                 ++scannedLogId;
             }
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 408446d..ccf6fd4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -545,6 +546,75 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
+        // restart bookies
+        restartBookies(c-> {
+            c.setMajorCompactionThreshold(0.0f);
+            c.setGcWaitTime(60000);
+            c.setMinorCompactionInterval(120000);
+            c.setMajorCompactionInterval(240000);
+            c.setForceAllowCompaction(true);
+            c.setEntryLogPerLedgerEnabled(true);
+            return c;
+        });
+
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, false);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+        long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+        long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+        assertFalse(getGCThread().enableMajorCompaction);
+        assertTrue(getGCThread().enableMinorCompaction);
+
+        // remove ledgers 1 and 2
+        bkc.deleteLedger(lhs[1].getId());
+        bkc.deleteLedger(lhs[2].getId());
+
+        // Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions.
+        while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+
+        LOG.info("Finished deleting the ledgers contains most entries.");
+        getGCThread().triggerGC(true, false, false).get();
+
+        assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
+        assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+
+        // At this point, we have the following state of ledgers end entry logs:
+        // L0 (not deleted) -> E0 (un-flushed): Entry log should exist.
+        // L1 (deleted) -> E1 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
+        // L2 (deleted) -> E2 (flushed): Entry log should have been garbage collected.
+        //                 E3 (flushed): Entry log should have been garbage collected.
+        //                 E4 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
+        assertTrue("Not found entry log files [0, 1, 4].log that should not have been compacted in: "
+                + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0, 1, 4));
+        assertTrue("Found entry log files [2, 3].log that should have been compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2, 3));
+
+        // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
+        // should consider it for deletion.
+        getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 1));
+
+        // Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed).
+        bkc.deleteLedger(lhs[0].getId());
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0));
+        getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
+        getGCThread().triggerGC(true, false, false).get();
+        assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
+                + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 0));
+    }
+
+    @Test
     public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception {
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 462d472..27f1abb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.util;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -49,9 +50,31 @@ public final class TestUtils {
         return bookieId.toString().replace('.', '_').replace('-', '_').replace(":", "_");
     }
 
+    public static boolean hasAllLogFiles(File ledgerDirectory, Integer... logsId) {
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        return logs.containsAll(Arrays.asList(logsId));
+    }
+
+    public static boolean hasNoneLogFiles(File ledgerDirectory, Integer... logsId) {
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        return Arrays.stream(logsId).noneMatch(logs::contains);
+    }
+
     public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
-        boolean result = partial ? false : true;
-        Set<Integer> logs = new HashSet<Integer>();
+        boolean result = !partial;
+        Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+        for (Integer logId : logsId) {
+            boolean exist = logs.contains(logId);
+            if ((partial && exist)
+                    || (!partial && !exist)) {
+                return !result;
+            }
+        }
+        return result;
+    }
+
+    private static Set<Integer> findEntryLogFileIds(File ledgerDirectory) {
+        Set<Integer> logs = new HashSet<>();
         for (File file : BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) {
             if (file.isFile()) {
                 String name = file.getName();
@@ -61,14 +84,7 @@ public final class TestUtils {
                 logs.add(Integer.parseInt(name.split("\\.")[0], 16));
             }
         }
-        for (Integer logId : logsId) {
-            boolean exist = logs.contains(logId);
-            if ((partial && exist)
-                    || (!partial && !exist)) {
-                return !result;
-            }
-        }
-        return result;
+        return logs;
     }
 
     public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {