You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/23 17:19:41 UTC

[bookkeeper] branch branch-4.7 updated: Avoid usage of RocksDB deleteRange() in DbLedgerStorage

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 6698ab8  Avoid usage of RocksDB deleteRange() in DbLedgerStorage
6698ab8 is described below

commit 6698ab8bc1f2f491e0f8ecd6826dfccddb7f32cb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Aug 23 10:19:02 2018 -0700

    Avoid usage of RocksDB deleteRange() in DbLedgerStorage
    
    ### Motivation
    
    There are few issues that are reconducible to a performance degradation in RocksDB when using deleteRange() feature (eg: https://github.com/apache/incubator-pulsar/issues/1737 and others).
    
    There is some discussion going on RocksDB to address this issue: https://github.com/facebook/rocksdb/issues/3959
    
    In the meantime, we should rollback the change and don't use deleteRange until these issues are resolved.
    
    ### Changes
    
    This PR is essentially reverting back the commit https://github.com/yahoo/bookkeeper/commit/4b849904bcd65b49cf963e6508dc7fb745f56294 from Yahoo branch (which was squashed when merging back to apache).
    The only addition here is to use `DELETE_ENTRIES_BATCH_SIZE` to amortize the cost of `batch.flush()` when there are many ledgers with few entries.
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1620 from merlimat/rollback-delete-range
    
    (cherry picked from commit dca471dfddbdbdea3dac029ddd76958340fa6629)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../bookie/storage/ldb/EntryLocationIndex.java     | 65 ++++++++++++++++++----
 1 file changed, 55 insertions(+), 10 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
index 53e37a2..21b87e2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.nio.file.FileSystems;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.EntryLocation;
@@ -188,6 +189,8 @@ public class EntryLocationIndex implements Closeable {
         deletedLedgers.add(ledgerId);
     }
 
+    private static final int DELETE_ENTRIES_BATCH_SIZE = 100000;
+
     public void removeOffsetFromDeletedLedgers() throws IOException {
         LongPairWrapper firstKeyWrapper = LongPairWrapper.get(-1, -1);
         LongPairWrapper lastKeyWrapper = LongPairWrapper.get(-1, -1);
@@ -200,6 +203,10 @@ public class EntryLocationIndex implements Closeable {
         }
 
         log.info("Deleting indexes for ledgers: {}", ledgersToDelete);
+        long startTime = System.nanoTime();
+        long deletedEntries = 0;
+        long deletedEntriesInBatch = 0;
+
         Batch batch = locationsDb.newBatch();
 
         try {
@@ -211,20 +218,58 @@ public class EntryLocationIndex implements Closeable {
                 firstKeyWrapper.set(ledgerId, 0);
                 lastKeyWrapper.set(ledgerId, Long.MAX_VALUE);
 
-                batch.deleteRange(firstKeyWrapper.array, lastKeyWrapper.array);
-            }
+                Entry<byte[], byte[]> firstKeyRes = locationsDb.getCeil(firstKeyWrapper.array);
+                if (firstKeyRes == null || ArrayUtil.getLong(firstKeyRes.getKey(), 0) != ledgerId) {
+                    // No entries found for ledger
+                    if (log.isDebugEnabled()) {
+                        log.debug("No entries found for ledger {}", ledgerId);
+                    }
+                    continue;
+                }
 
-            batch.flush();
+                long firstEntryId = ArrayUtil.getLong(firstKeyRes.getKey(), 8);
+                long lastEntryId = getLastEntryInLedgerInternal(ledgerId);
+                if (log.isDebugEnabled()) {
+                    log.debug("Deleting index for ledger {} entries ({} -> {})",
+                            ledgerId, firstEntryId, lastEntryId);
+                }
 
-            // Removed from pending set
-            for (long ledgerId : ledgersToDelete) {
-                deletedLedgers.remove(ledgerId);
+                // Iterate over all the keys and remove each of them
+                for (long entryId = firstEntryId; entryId <= lastEntryId; entryId++) {
+                    keyToDelete.set(ledgerId, entryId);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Deleting index for ({}, {})", keyToDelete.getFirst(), keyToDelete.getSecond());
+                    }
+                    batch.remove(keyToDelete.array);
+                    ++deletedEntriesInBatch;
+                    ++deletedEntries;
+                }
+
+                if (deletedEntriesInBatch > DELETE_ENTRIES_BATCH_SIZE) {
+                    batch.flush();
+                    batch.clear();
+                    deletedEntriesInBatch = 0;
+                }
             }
         } finally {
-            firstKeyWrapper.recycle();
-            lastKeyWrapper.recycle();
-            keyToDelete.recycle();
-            batch.close();
+            try {
+                batch.flush();
+                batch.clear();
+            } finally {
+
+                firstKeyWrapper.recycle();
+                lastKeyWrapper.recycle();
+                keyToDelete.recycle();
+                batch.close();
+            }
+        }
+
+        log.info("Deleted indexes for {} entries from {} ledgers in {} seconds", deletedEntries, ledgersToDelete.size(),
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) / 1000.0);
+
+        // Removed from pending set
+        for (long ledgerId : ledgersToDelete) {
+            deletedLedgers.remove(ledgerId);
         }
     }