You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by yo...@apache.org on 2022/08/02 06:26:08 UTC

[bookkeeper] 15/22: [Issue 3389] Prioritize compaction of entry logs with the lowest amount of remaining usable data (#3390)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 063cc8b7cb0bb19fd69238a029868a54972010b1
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Jul 22 16:51:01 2022 -0700

    [Issue 3389] Prioritize compaction of entry logs with the lowest amount of remaining usable data (#3390)
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    Prioritize compaction to free up more space faster.
    
    ### Changes
    
    doCompactEntryLogs() iterates over entry logs in whatever natural order they happen to be, picks the first with usage below thresholds and starts compacting.
    
    Added a Priority Queue of entry logs to pick ones with the most compactable space first; it also helps when the time for compaction is limited (via majorCompactionMaxTimeMillis / minorCompactionMaxTimeMillis), instead of spending time on rewriting files with more data we'll pick the files with the least amount of data first.
    
    Master Issue: #3389
    
    (cherry picked from commit 1825677b1ebacef113423b4afc463a0dcdc8988e)
---
 .../bookkeeper/bookie/EntryLogMetadataMap.java     |  8 +++
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 64 ++++++++++++++++++----
 .../bookie/InMemoryEntryLogMetadataMap.java        |  6 ++
 .../storage/ldb/PersistentEntryLogMetadataMap.java | 61 ++++++++++++++++-----
 .../ldb/PersistentEntryLogMetadataMapTest.java     | 14 +++++
 5 files changed, 129 insertions(+), 24 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
index 88f6ce5398..afd5c7f4d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogMetadataMap.java
@@ -59,6 +59,14 @@ public interface EntryLogMetadataMap extends Closeable {
      */
     void forEach(BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;
 
+    /**
+     * Performs the given action for the key.
+     *
+     * @param action
+     * @throws EntryLogMetadataMapException
+     */
+    void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException;
+
     /**
      * Removes entryLogMetadata record from the map.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 6e3d00c3d4..c88fa0bf4d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -28,6 +28,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -545,6 +547,11 @@ public class GarbageCollectorThread extends SafeRunnable {
         int[] entryLogUsageBuckets = new int[numBuckets];
         int[] compactedBuckets = new int[numBuckets];
 
+        ArrayList<LinkedList<Long>> compactableBuckets = new ArrayList<>(numBuckets);
+        for (int i = 0; i < numBuckets; i++) {
+            compactableBuckets.add(new LinkedList<>());
+        }
+
         long start = System.currentTimeMillis();
         MutableLong end = new MutableLong(start);
         MutableLong timeDiff = new MutableLong(0);
@@ -559,25 +566,62 @@ public class GarbageCollectorThread extends SafeRunnable {
             }
             if (meta.getUsage() >= threshold || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
                     || !running) {
-                // We allow the usage limit calculation to continue so that we get a accurate
+                // We allow the usage limit calculation to continue so that we get an accurate
                 // report of where the usage was prior to running compaction.
                 return;
             }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Compacting entry log {} with usage {} below threshold {}",
-                        meta.getEntryLogId(), meta.getUsage(), threshold);
-            }
 
-            long priorRemainingSize = meta.getRemainingSize();
-            compactEntryLog(meta);
-            gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
-            compactedBuckets[bucketIndex]++;
+            compactableBuckets.get(bucketIndex).add(meta.getEntryLogId());
         });
+
+        LOG.info(
+                "Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
+                entryLogUsageBuckets);
+
+        final int maxBucket = calculateUsageIndex(numBuckets, threshold);
+        stopCompaction:
+        for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
+            LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
+            while (!entryLogIds.isEmpty()) {
+                if (timeDiff.getValue() < maxTimeMillis) {
+                    end.setValue(System.currentTimeMillis());
+                    timeDiff.setValue(end.getValue() - start);
+                }
+
+                if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) {
+                    // We allow the usage limit calculation to continue so that we get an accurate
+                    // report of where the usage was prior to running compaction.
+                    break stopCompaction;
+                }
+
+                final int bucketIndex = currBucket;
+                final long logId = entryLogIds.remove();
+
+                entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
+                    if (meta == null) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Metadata for entry log {} already deleted", logId);
+                        }
+                        return;
+                    }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Compacting entry log {} with usage {} below threshold {}",
+                                meta.getEntryLogId(), meta.getUsage(), threshold);
+                    }
+
+                    long priorRemainingSize = meta.getRemainingSize();
+                    compactEntryLog(meta);
+                    gcStats.getReclaimedSpaceViaCompaction().add(meta.getTotalSize() - priorRemainingSize);
+                    compactedBuckets[bucketIndex]++;
+                });
+            }
+        }
+
         if (LOG.isDebugEnabled()) {
             if (!running) {
                 LOG.debug("Compaction exited due to gc not running");
             }
-            if (timeDiff.getValue() > maxTimeMillis) {
+            if (maxTimeMillis > 0 && timeDiff.getValue() > maxTimeMillis) {
                 LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
             }
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
index 106a382f54..3fec798a0d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InMemoryEntryLogMetadataMap.java
@@ -47,6 +47,12 @@ public class InMemoryEntryLogMetadataMap implements EntryLogMetadataMap {
         entryLogMetaMap.forEach(action);
     }
 
+    @Override
+    public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action)
+            throws BookieException.EntryLogMetadataMapException {
+        action.accept(entryLogId, entryLogMetaMap.get(entryLogId));
+    }
+
     @Override
     public void remove(long entryLogId) {
         entryLogMetaMap.remove(entryLogId);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
index ec289fd16c..f3f99be571 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMap.java
@@ -139,20 +139,7 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap {
                 }
                 Entry<byte[], byte[]> entry = iterator.next();
                 long entryLogId = ArrayUtil.getLong(entry.getKey(), 0);
-                ByteArrayInputStream localBais = bais.get();
-                DataInputStream localDatais = datais.get();
-                if (localBais.available() < entry.getValue().length) {
-                    localBais.close();
-                    localDatais.close();
-                    ByteArrayInputStream newBais = new ByteArrayInputStream(entry.getValue());
-                    bais.set(newBais);
-                    datais.set(new DataInputStream(newBais));
-                } else {
-                    localBais.read(entry.getValue(), 0, entry.getValue().length);
-                }
-                localBais.reset();
-                localDatais.reset();
-                EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get());
+                EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(entry.getValue());
                 try {
                     action.accept(entryLogId, metadata);
                 } finally {
@@ -171,6 +158,52 @@ public class PersistentEntryLogMetadataMap implements EntryLogMetadataMap {
         }
     }
 
+    /**
+     * {@link EntryLogMetadata} life-cycle in supplied action will be transient
+     * and it will be recycled as soon as supplied action is completed.
+     */
+    @Override
+    public void forKey(long entryLogId, BiConsumer<Long, EntryLogMetadata> action) throws EntryLogMetadataMapException {
+        throwIfClosed();
+        LongWrapper key = LongWrapper.get(entryLogId);
+        try {
+            byte[] value = metadataMapDB.get(key.array);
+            if (value == null || value.length == 0) {
+                action.accept(entryLogId, null);
+                return;
+            }
+            EntryLogMetadataRecyclable metadata = getEntryLogMetadataRecyclable(value);
+            try {
+                action.accept(entryLogId, metadata);
+            } finally {
+                metadata.recycle();
+            }
+        } catch (IOException e) {
+            log.error("Failed to get metadata for entryLogId {}: {}", entryLogId, e.getMessage(), e);
+            throw new EntryLogMetadataMapException(e);
+        } finally {
+            key.recycle();
+        }
+    }
+
+    private EntryLogMetadataRecyclable getEntryLogMetadataRecyclable(byte[] value) throws IOException {
+        ByteArrayInputStream localBais = bais.get();
+        DataInputStream localDatais = datais.get();
+        if (localBais.available() < value.length) {
+            localBais.close();
+            localDatais.close();
+            ByteArrayInputStream newBais = new ByteArrayInputStream(value);
+            bais.set(newBais);
+            datais.set(new DataInputStream(newBais));
+        } else {
+            localBais.read(value, 0, value.length);
+        }
+        localBais.reset();
+        localDatais.reset();
+        EntryLogMetadataRecyclable metadata = EntryLogMetadata.deserialize(datais.get());
+        return metadata;
+    }
+
     @Override
     public void remove(long entryLogId) throws EntryLogMetadataMapException {
         throwIfClosed();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
index 243fe692ec..205c347add 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/PersistentEntryLogMetadataMapTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.List;
 
+import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.EntryLogMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.junit.Rule;
@@ -80,6 +81,19 @@ public class PersistentEntryLogMetadataMapTest {
             }
         });
 
+        metadatas.forEach(meta -> {
+            long logId = meta.getEntryLogId();
+            try {
+                entryMetadataMap.forKey(logId, (entryLogId, persistedMeta) -> {
+                    assertEquals(meta.getEntryLogId(), persistedMeta.getEntryLogId());
+                    assertEquals(meta.getTotalSize(), persistedMeta.getTotalSize());
+                    assertEquals(logId, (long) entryLogId);
+                });
+            } catch (BookieException.EntryLogMetadataMapException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
         // remove entry-log entry
         for (int i = 1; i <= totalMetadata; i++) {
             entryMetadataMap.remove(i);