You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2021/10/11 13:35:11 UTC
[bookkeeper] branch master updated: Issue 2728: Entry Log GC may
get blocked when using entryLogPerLedgerEnabled option (#2779)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e413c70 Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779)
e413c70 is described below
commit e413c7094f5a4ba7cde3b2e399f8ebe4366b464c
Author: Raúl Gracia <ra...@emc.com>
AuthorDate: Mon Oct 11 15:35:02 2021 +0200
Issue 2728: Entry Log GC may get blocked when using entryLogPerLedgerEnabled option (#2779)
---
.../org/apache/bookkeeper/bookie/EntryLogger.java | 30 ++++++++++
.../bookkeeper/bookie/GarbageCollectorThread.java | 28 ++++++---
.../apache/bookkeeper/bookie/CompactionTest.java | 70 ++++++++++++++++++++++
.../java/org/apache/bookkeeper/util/TestUtils.java | 36 +++++++----
4 files changed, 147 insertions(+), 17 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index 504adfa..49a9ca4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -482,6 +482,27 @@ public class EntryLogger {
return recentlyCreatedEntryLogsStatus.getLeastUnflushedLogId();
}
+ /**
+ * Get the last log id created so far. If entryLogPerLedger is enabled, the Garbage Collector
+ * process needs to look beyond the least unflushed entry log file, as there may be entry logs
+ * ready to be garbage collected.
+ *
+ * @return last entry log id created.
+ */
+ long getLastLogId() {
+ return recentlyCreatedEntryLogsStatus.getLastLogId();
+ }
+
+ /**
+ * Returns whether the current log id exists and has been rotated already.
+ *
+ * @param entryLogId EntryLog id to check.
+ * @return Whether the given entryLogId exists and has been rotated.
+ */
+ boolean isFlushedEntryLog(Long entryLogId) {
+ return recentlyCreatedEntryLogsStatus.isFlushedEntryLog(entryLogId);
+ }
+
long getPreviousAllocatedEntryLogId() {
return entryLoggerAllocator.getPreallocatedLogId();
}
@@ -1249,5 +1270,14 @@ public class EntryLogger {
synchronized long getLeastUnflushedLogId() {
return leastUnflushedLogId;
}
+
+ synchronized long getLastLogId() {
+ return !entryLogsStatusMap.isEmpty() ? entryLogsStatusMap.lastKey() : 0;
+ }
+
+ synchronized boolean isFlushedEntryLog(Long entryLogId) {
+ return entryLogsStatusMap.containsKey(entryLogId) && entryLogsStatusMap.get(entryLogId)
+ || entryLogId < leastUnflushedLogId;
+ }
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index fb54890..cafbf53 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import lombok.Getter;
import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
@@ -583,12 +584,15 @@ public class GarbageCollectorThread extends SafeRunnable {
* @throws IOException
*/
protected Map<Long, EntryLogMetadata> extractMetaFromEntryLogs(Map<Long, EntryLogMetadata> entryLogMetaMap) {
- // Extract it for every entry log except for the current one.
- // Entry Log ID's are just a long value that starts at 0 and increments
- // by 1 when the log fills up and we roll to a new one.
- long curLogId = entryLogger.getLeastUnflushedLogId();
+ // Entry Log ID's are just a long value that starts at 0 and increments by 1 when the log fills up and we roll
+ // to a new one. We scan entry logs as follows:
+ // - entryLogPerLedgerEnabled is false: Extract it for every entry log except for the current one (un-flushed).
+ // - entryLogPerLedgerEnabled is true: Scan all flushed entry logs up to the highest known id.
+ Supplier<Long> finalEntryLog = () -> conf.isEntryLogPerLedgerEnabled() ? entryLogger.getLastLogId() :
+ entryLogger.getLeastUnflushedLogId();
boolean hasExceptionWhenScan = false;
- for (long entryLogId = scannedLogId; entryLogId < curLogId; entryLogId++) {
+ boolean increaseScannedLogId = true;
+ for (long entryLogId = scannedLogId; entryLogId < finalEntryLog.get(); entryLogId++) {
// Comb the current entry log file if it has not already been extracted.
if (entryLogMetaMap.containsKey(entryLogId)) {
continue;
@@ -600,6 +604,15 @@ public class GarbageCollectorThread extends SafeRunnable {
continue;
}
+ // If entryLogPerLedgerEnabled is true, we will look for entry log files beyond getLeastUnflushedLogId()
+ // that have been explicitly rotated or below getLeastUnflushedLogId().
+ if (conf.isEntryLogPerLedgerEnabled() && !entryLogger.isFlushedEntryLog(entryLogId)) {
+ LOG.info("Entry log {} not flushed (entryLogPerLedgerEnabled). Starting next iteration at this point.",
+ entryLogId);
+ increaseScannedLogId = false;
+ continue;
+ }
+
LOG.info("Extracting entry log meta from entryLogId: {}", entryLogId);
try {
@@ -619,8 +632,9 @@ public class GarbageCollectorThread extends SafeRunnable {
// if scan failed on some entry log, we don't move 'scannedLogId' to next id
// if scan succeed, we don't need to scan it again during next gc run,
- // we move 'scannedLogId' to next id
- if (!hasExceptionWhenScan) {
+ // we move 'scannedLogId' to next id (unless entryLogPerLedgerEnabled is true
+ // and we have found and un-flushed entry log already).
+ if (!hasExceptionWhenScan && (!conf.isEntryLogPerLedgerEnabled() || increaseScannedLogId)) {
++scannedLogId;
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 408446d..ccf6fd4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -51,6 +51,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -545,6 +546,75 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
}
@Test
+ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
+ // restart bookies
+ restartBookies(c-> {
+ c.setMajorCompactionThreshold(0.0f);
+ c.setGcWaitTime(60000);
+ c.setMinorCompactionInterval(120000);
+ c.setMajorCompactionInterval(240000);
+ c.setForceAllowCompaction(true);
+ c.setEntryLogPerLedgerEnabled(true);
+ return c;
+ });
+
+ // prepare data
+ LedgerHandle[] lhs = prepareData(3, false);
+
+ for (LedgerHandle lh : lhs) {
+ lh.close();
+ }
+
+ long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+ long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+ assertFalse(getGCThread().enableMajorCompaction);
+ assertTrue(getGCThread().enableMinorCompaction);
+
+ // remove ledgers 1 and 2
+ bkc.deleteLedger(lhs[1].getId());
+ bkc.deleteLedger(lhs[2].getId());
+
+ // Need to wait until entry log 3 gets flushed before initiating GC to satisfy assertions.
+ while (!getGCThread().entryLogger.isFlushedEntryLog(3L)) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+
+ LOG.info("Finished deleting the ledgers contains most entries.");
+ getGCThread().triggerGC(true, false, false).get();
+
+ assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
+ assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+
+ // At this point, we have the following state of ledgers end entry logs:
+ // L0 (not deleted) -> E0 (un-flushed): Entry log should exist.
+ // L1 (deleted) -> E1 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
+ // L2 (deleted) -> E2 (flushed): Entry log should have been garbage collected.
+ // E3 (flushed): Entry log should have been garbage collected.
+ // E4 (un-flushed): Entry log should exist as un-flushed entry logs are not considered for GC.
+ assertTrue("Not found entry log files [0, 1, 4].log that should not have been compacted in: "
+ + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0, 1, 4));
+ assertTrue("Found entry log files [2, 3].log that should have been compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 2, 3));
+
+ // Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
+ // should consider it for deletion.
+ getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 1));
+
+ // Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed).
+ bkc.deleteLedger(lhs[0].getId());
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasAllLogFiles(tmpDirs.get(0), 0));
+ getGCThread().entryLogger.recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
+ getGCThread().triggerGC(true, false, false).get();
+ assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
+ + tmpDirs.get(0), TestUtils.hasNoneLogFiles(tmpDirs.get(0), 0));
+ }
+
+ @Test
public void testMinorCompactionWithNoWritableLedgerDirs() throws Exception {
// prepare data
LedgerHandle[] lhs = prepareData(3, false);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 462d472..27f1abb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.util;
import java.io.File;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -49,9 +50,31 @@ public final class TestUtils {
return bookieId.toString().replace('.', '_').replace('-', '_').replace(":", "_");
}
+ public static boolean hasAllLogFiles(File ledgerDirectory, Integer... logsId) {
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ return logs.containsAll(Arrays.asList(logsId));
+ }
+
+ public static boolean hasNoneLogFiles(File ledgerDirectory, Integer... logsId) {
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ return Arrays.stream(logsId).noneMatch(logs::contains);
+ }
+
public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
- boolean result = partial ? false : true;
- Set<Integer> logs = new HashSet<Integer>();
+ boolean result = !partial;
+ Set<Integer> logs = findEntryLogFileIds(ledgerDirectory);
+ for (Integer logId : logsId) {
+ boolean exist = logs.contains(logId);
+ if ((partial && exist)
+ || (!partial && !exist)) {
+ return !result;
+ }
+ }
+ return result;
+ }
+
+ private static Set<Integer> findEntryLogFileIds(File ledgerDirectory) {
+ Set<Integer> logs = new HashSet<>();
for (File file : BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles()) {
if (file.isFile()) {
String name = file.getName();
@@ -61,14 +84,7 @@ public final class TestUtils {
logs.add(Integer.parseInt(name.split("\\.")[0], 16));
}
}
- for (Integer logId : logsId) {
- boolean exist = logs.contains(logId);
- if ((partial && exist)
- || (!partial && !exist)) {
- return !result;
- }
- }
- return result;
+ return logs;
}
public static void waitUntilLacUpdated(ReadHandle rh, long newLac) throws Exception {