You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/02 06:26:08 UTC
[bookkeeper] 15/22: [Issue 3389] Prioritize compaction of entry logs with the lowest amount of remaining usable data (#3390)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 063cc8b7cb0bb19fd69238a029868a54972010b1
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jul 22 16:51:01 2022 -0700
[Issue 3389] Prioritize compaction of entry logs with the lowest amount of remaining usable data (#3390)
Descriptions of the changes in this PR:
### Motivation
Prioritize compaction to free up more space faster.
### Changes
doCompactEntryLogs() iterates over entry logs in whatever natural order they happen to be, picks the first with usage below thresholds and starts compacting.
Added a Priority Queue of entry logs to pick ones with the most compactable space first; it also helps when the time for compaction is limited (via majorCompactionMaxTimeMillis / minorCompactionMaxTimeMillis), instead of spending time on rewriting files with more data we'll pick the files with the least amount of data first.
Master Issue: #3389
(cherry picked from commit 1825677b1ebacef113423b4afc463a0dcdc8988e)
---
.../bookkeeper/bookie/EntryLogMetadataMap.java | 8 +++
.../bookkeeper/bookie/GarbageCollectorThread.java | 64 ++++++++++++++++++----
.../bookie/InMemoryEntryLogMetadataMap.java | 6 ++
.../storage/ldb/PersistentEntryLogMetadataMap.java | 61 ++++++++++++++++-----
.../ldb/PersistentEntryLogMetadataMapTest.java | 14 +++++
5 files changed, 129 insertions(+), 24 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
index 88f6ce5398..afd5c7f4d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
@@ -59,6 +59,14 @@ public interface EntryLogMetadataMap extends Closeable {
*/
void forEach(BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;
+ /**
+ * Performs the given action for the key.
+ *
+ * @param action
+ * @throws EntryLogMetadataMapException
+ */
+ void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;
+
/**
* Removes entryLogMetadata record from the map.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 6e3d00c3d4..c88fa0bf4d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -28,6 +28,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -545,6 +547,11 @@ public class GarbageCollectorThread extends SafeRunnable {
int[] entryLogUsageBuckets = new int[numBuckets];
int[] compactedBuckets = new int[numBuckets];
+ ArrayList<LinkedList<Long>> compactableBuckets = new ArrayList<>(numBuckets);
+ for (int i = 0; i < numBuckets; i++) {
+ compactableBuckets.add(new LinkedList<>());
+ }
+
long start = System.currentTimeMillis();
MutableLong end = new MutableLong(start);
MutableLong timeDiff = new MutableLong(0);
@@ -559,25 +566,62 @@ public class GarbageCollectorThread extends SafeRunnable {
}
if (meta.getUsage() >= threshold || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
|| !running) {
- // We allow the usage limit calculation to continue so that we get a accurate
+ // We allow the usage limit calculation to continue so that we get an accurate
// report of where the usage was prior to running compaction.
return;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Compacting entry log {} with usage {} below threshold {}",
- meta.getEntryLogId(), meta.getUsage(), threshold);
- }
- long priorRemainingSize = meta.getRemainingSize();
- compactEntryLog(meta);
- gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
- compactedBuckets[bucketIndex]++;
+ compactableBuckets.get(bucketIndex).add(meta.getEntryLogId());
});
+
+ LOG.info(
+ "Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
+ entryLogUsageBuckets);
+
+ final int maxBucket = calculateUsageIndex(numBuckets, threshold);
+ stopCompaction:
+ for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
+ LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
+ while (!entryLogIds.isEmpty()) {
+ if (timeDiff.getValue() < maxTimeMillis) {
+ end.setValue(System.currentTimeMillis());
+ timeDiff.setValue(end.getValue() - start);
+ }
+
+ if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) {
+ // We allow the usage limit calculation to continue so that we get an accurate
+ // report of where the usage was prior to running compaction.
+ break stopCompaction;
+ }
+
+ final int bucketIndex = currBucket;
+ final long logId = entryLogIds.remove();
+
+ entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
+ if (meta == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Metadata for entry log {} already deleted", logId);
+ }
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Compacting entry log {} with usage {} below threshold {}",
+ meta.getEntryLogId(), meta.getUsage(), threshold);
+ }
+
+ long priorRemainingSize = meta.getRemainingSize();
+ compactEntryLog(meta);
+ gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
+ compactedBuckets[bucketIndex]++;
+ });
+ }
+ }
+
if (LOG.isDebugEnabled()) {
if (!running) {
LOG.debug("Compaction exited due to gc not running");
}
- if (timeDiff.getValue() > maxTimeMillis) {
+ if (maxTimeMillis > 0 && timeDiff.getValue() > maxTimeMillis) {
LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
index 106a382f54..3fec798a0d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
@@ -47,6 +47,12 @@ public class InMemoryEntryLogMetadataMap implements EntryLogMetadataMap {
entryLogMetaMap.forEach(action);
}
+ @Override
+ public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action)
+ throws BookieException.EntryLogMetadataMapException {
+ action.accept(entryLogId, entryLogMetaMap.get(entryLogId));
+ }
+
@Override
public void remove(long entryLogId) {
entryLogMetaMap.remove(entryLogId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
index ec289fd16c..f3f99be571 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
@@ -139,20 +139,7 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap {
}
Entry<byte[], byte[]> entry = iterator.next();
long entryLogId = ArrayUtil.getLong(entry.getKey(), 0);
- ByteArrayInputStream localBais = bais.get();
- DataInputStream localDatais = datais.get();
- if (localBais.available() < entry.getValue().length) {
- localBais.close();
- localDatais.close();
- ByteArrayInputStream newBais = new ByteArrayInputStream(entry.getValue());
- bais.set(newBais);
- datais.set(new DataInputStream(newBais));
- } else {
- localBais.read(entry.getValue(), 0, entry.getValue().length);
- }
- localBais.reset();
- localDatais.reset();
- EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get());
+ EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(entry.getValue());
try {
action.accept(entryLogId, metadata);
} finally {
@@ -171,6 +158,52 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap {
}
}
+ /**
+ * {@link EntryLogMetadata} life-cycle in supplied action will be transient
+ * and it will be recycled as soon as supplied action is completed.
+ */
+ @Override
+ public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException {
+ throwIfClosed();
+ LongWrapper key = LongWrapper.get(entryLogId);
+ try {
+ byte[] value = metadataMapDB.get(key.array);
+ if (value == null || value.length == 0) {
+ action.accept(entryLogId, null);
+ return;
+ }
+ EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(value);
+ try {
+ action.accept(entryLogId, metadata);
+ } finally {
+ metadata.recycle();
+ }
+ } catch (IOException e) {
+ log.error("Failed to get metadata for entryLogId {}: {}", entryLogId, e.getMessage(), e);
+ throw new EntryLogMetadataMapException(e);
+ } finally {
+ key.recycle();
+ }
+ }
+
+ private EntryLogMetadataRecyclable getEntryLogMetadataRecyclable(byte[] value) throws IOException {
+ ByteArrayInputStream localBais = bais.get();
+ DataInputStream localDatais = datais.get();
+ if (localBais.available() < value.length) {
+ localBais.close();
+ localDatais.close();
+ ByteArrayInputStream newBais = new ByteArrayInputStream(value);
+ bais.set(newBais);
+ datais.set(new DataInputStream(newBais));
+ } else {
+ localBais.read(value, 0, value.length);
+ }
+ localBais.reset();
+ localDatais.reset();
+ EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get());
+ return metadata;
+ }
+
@Override
public void remove(long entryLogId) throws EntryLogMetadataMapException {
throwIfClosed();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
index 243fe692ec..205c347add 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.util.List;
+import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.EntryLogMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.junit.Rule;
@@ -80,6 +81,19 @@ public class PersistentEntryLogMetadataMapTest {
}
});
+ metadatas.forEach(meta -> {
+ long logId = meta.getEntryLogId();
+ try {
+ entryMetadataMap.forKey(logId, (entryLogId, persistedMeta) -> {
+ assertEquals(meta.getEntryLogId(), persistedMeta.getEntryLogId());
+ assertEquals(meta.getTotalSize(), persistedMeta.getTotalSize());
+ assertEquals(logId, (long) entryLogId);
+ });
+ } catch (BookieException.EntryLogMetadataMapException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
// remove entry-log entry
for (int i = 1; i <= totalMetadata; i++) {
entryMetadataMap.remove(i);