You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/16 16:38:35 UTC

[GitHub] merlimat closed pull request #1264: Refactored LAC cache in DbLedgerStorage

merlimat closed pull request #1264: Refactored LAC cache in DbLedgerStorage
URL: https://github.com/apache/bookkeeper/pull/1264
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
index 4beec22f2..ff9cd316a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java
@@ -25,28 +25,27 @@
 import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.SortedMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -74,6 +73,7 @@
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +83,8 @@
  */
 public class DbLedgerStorage implements CompactableLedgerStorage {
 
+    private static final long NOT_ASSIGNED_LAC = Long.MIN_VALUE;
+
     /**
      * This class borrows the logic from FileInfo.
      *
@@ -93,7 +95,7 @@
             implements AutoCloseable {
 
         // lac
-        private Long lac = null;
+        private volatile long lac = NOT_ASSIGNED_LAC;
         // request from explicit lac requests
         private ByteBuffer explicitLac = null;
         // is the ledger info closed?
@@ -103,6 +105,8 @@
         // reference to LedgerMetadataIndex
         private final LedgerMetadataIndex ledgerIndex;
 
+        private long lastAccessed;
+
         /**
          * Construct an Watchable with zero watchers.
          */
@@ -110,19 +114,21 @@ public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
             super(WATCHER_RECYCLER);
             this.ledgerId = ledgerId;
             this.ledgerIndex = ledgerIndex;
+            this.lastAccessed = System.currentTimeMillis();
         }
 
-        synchronized Long getLastAddConfirmed() {
+        long getLastAddConfirmed() {
             return lac;
         }
 
-        Long setLastAddConfirmed(long lac) {
+        long setLastAddConfirmed(long lac) {
             long lacToReturn;
             boolean changed = false;
             synchronized (this) {
-                if (null == this.lac || this.lac < lac) {
+                if (this.lac == NOT_ASSIGNED_LAC || this.lac < lac) {
                     this.lac = lac;
                     changed = true;
+                    lastAccessed = System.currentTimeMillis();
                 }
                 lacToReturn = this.lac;
             }
@@ -135,8 +141,8 @@ Long setLastAddConfirmed(long lac) {
         synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
                                                            Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
-            if ((null != lac && lac > previousLAC)
-                    || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
+            lastAccessed = System.currentTimeMillis();
+            if ((lac != NOT_ASSIGNED_LAC && lac > previousLAC) || isClosed || ledgerIndex.get(ledgerId).getFenced()) {
                 return false;
             }
 
@@ -171,10 +177,17 @@ public void setExplicitLac(ByteBuf lac) {
                 explicitLac.getLong();
                 explicitLacValue = explicitLac.getLong();
                 explicitLac.rewind();
+
+                lastAccessed = System.currentTimeMillis();
             }
             setLastAddConfirmed(explicitLacValue);
         }
 
+        boolean isStale() {
+            return (lastAccessed + TimeUnit.MINUTES.toMillis(LEDGER_INFO_CACHING_TIME_MINUTES)) < System
+                    .currentTimeMillis();
+        }
+
         void notifyWatchers(long lastAddConfirmed) {
             notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
         }
@@ -197,7 +210,9 @@ public void close() {
 
     private LedgerMetadataIndex ledgerIndex;
     private EntryLocationIndex entryLocationIndex;
-    private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;
+
+    private static final long LEDGER_INFO_CACHING_TIME_MINUTES = 10;
+    private ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
 
     private GarbageCollectorThread gcThread;
 
@@ -221,8 +236,8 @@ public void close() {
     private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
 
     // Executor used to for db index cleanup
-    private final ExecutorService cleanupExecutor = Executors
-            .newSingleThreadExecutor(new DefaultThreadFactory("db-storage-cleanup"));
+    private final ScheduledExecutorService cleanupExecutor = Executors
+            .newSingleThreadScheduledExecutor(new DefaultThreadFactory("db-storage-cleanup"));
 
     static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
     static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
@@ -287,23 +302,10 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
         ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
         entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
 
-        // build the ledger info cache
-        int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
-        RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener = this::handleLedgerEviction;
-        CacheBuilder<Long, TransientLedgerInfo> builder = CacheBuilder.newBuilder()
-            .initialCapacity(conf.getFileInfoCacheInitialCapacity())
-            .maximumSize(conf.getOpenFileLimit())
-            .concurrencyLevel(concurrencyLevel)
-            .removalListener(ledgerInfoRemovalListener);
-        if (conf.getFileInfoMaxIdleTime() > 0) {
-            builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(), TimeUnit.SECONDS);
-        }
-        transientLedgerInfoCache = builder.build(new CacheLoader<Long, TransientLedgerInfo>() {
-            @Override
-            public TransientLedgerInfo load(Long key) throws Exception {
-                return new TransientLedgerInfo(key, ledgerIndex);
-            }
-        });
+        transientLedgerInfoCache = new ConcurrentLongHashMap<>(16 * 1024,
+                Runtime.getRuntime().availableProcessors() * 2);
+        cleanupExecutor.scheduleAtFixedRate(this::cleanupStaleTransientLedgerInfo, LEDGER_INFO_CACHING_TIME_MINUTES,
+                LEDGER_INFO_CACHING_TIME_MINUTES, TimeUnit.MINUTES);
 
         entryLogger = new EntryLogger(conf, ledgerDirsManager);
         gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);
@@ -312,14 +314,17 @@ public TransientLedgerInfo load(Long key) throws Exception {
     }
 
     /**
-     * When a ledger is evicted from transient ledger info cache, we can just simply discard the object.
+     * Evict all the ledger info object that were not used recently.
      */
-    private void handleLedgerEviction(RemovalNotification<Long, TransientLedgerInfo> notification) {
-        TransientLedgerInfo ledgerInfo = notification.getValue();
-        if (null == ledgerInfo || null == notification.getKey()) {
-            return;
-        }
-        ledgerInfo.close();
+    private void cleanupStaleTransientLedgerInfo() {
+        transientLedgerInfoCache.removeIf((ledgerId, ledgerInfo) -> {
+            boolean isStale = ledgerInfo.isStale();
+            if (isStale) {
+                ledgerInfo.close();
+            }
+
+            return isStale;
+        });
     }
 
     public void registerStats() {
@@ -437,7 +442,7 @@ public boolean setFenced(long ledgerId) throws IOException {
         boolean changed = ledgerIndex.setFenced(ledgerId);
         if (changed) {
             // notify all the watchers if a ledger is fenced
-            TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+            TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
             if (null != ledgerInfo) {
                 ledgerInfo.notifyWatchers(Long.MAX_VALUE);
             }
@@ -487,8 +492,7 @@ public long addEntry(ByteBuf entry) throws IOException {
         }
 
         // after successfully insert the entry, update LAC and notify the watchers
-        transientLedgerInfoCache.getUnchecked(ledgerId)
-            .setLastAddConfirmed(lac);
+        updateCachedLacIfNeeded(ledgerId, lac);
 
         recordSuccessfulEvent(addEntryStats, startTime);
         return entryId;
@@ -854,6 +858,11 @@ public void deleteLedger(long ledgerId) throws IOException {
             LedgerDeletionListener listener = ledgerDeletionListeners.get(i);
             listener.ledgerDeleted(ledgerId);
         }
+
+        TransientLedgerInfo tli = transientLedgerInfoCache.remove(ledgerId);
+        if (tli != null) {
+            tli.close();
+        }
     }
 
     @Override
@@ -876,14 +885,14 @@ public EntryLogger getEntryLogger() {
 
     @Override
     public long getLastAddConfirmed(long ledgerId) throws IOException {
-        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
-        Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : null;
-        if (null == lac) {
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
+        long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : NOT_ASSIGNED_LAC;
+        if (lac == NOT_ASSIGNED_LAC) {
             ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
             try {
                 bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
                 lac = bb.readLong();
-                lac = transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
+                lac = getOrAddLedgerInfo(ledgerId).setLastAddConfirmed(lac);
             } finally {
                 bb.release();
             }
@@ -894,19 +903,17 @@ public long getLastAddConfirmed(long ledgerId) throws IOException {
     @Override
     public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
             Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-        return transientLedgerInfoCache.getUnchecked(ledgerId)
-            .waitForLastAddConfirmedUpdate(previousLAC, watcher);
+        return getOrAddLedgerInfo(ledgerId).waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
     @Override
     public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
-        transientLedgerInfoCache.getUnchecked(ledgerId)
-            .setExplicitLac(lac);
+        getOrAddLedgerInfo(ledgerId).setExplicitLac(lac);
     }
 
     @Override
     public ByteBuf getExplicitLac(long ledgerId) {
-        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
+        TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.get(ledgerId);
         if (null == ledgerInfo) {
             return null;
         } else {
@@ -914,6 +921,30 @@ public ByteBuf getExplicitLac(long ledgerId) {
         }
     }
 
+    private TransientLedgerInfo getOrAddLedgerInfo(long ledgerId) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            return tli;
+        } else {
+            TransientLedgerInfo newTli = new TransientLedgerInfo(ledgerId, ledgerIndex);
+            tli = transientLedgerInfoCache.putIfAbsent(ledgerId, newTli);
+            if (tli != null) {
+                newTli.close();
+                return tli;
+            } else {
+                return newTli;
+            }
+        }
+    }
+
+    private void updateCachedLacIfNeeded(long ledgerId, long lac) {
+        TransientLedgerInfo tli = transientLedgerInfoCache.get(ledgerId);
+        if (tli != null) {
+            tli.setLastAddConfirmed(lac);
+        }
+    }
+
+
     @Override
     public void flushEntriesLocationsIndex() throws IOException {
         // No-op. Location index is already flushed in updateEntriesLocations() call
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 011a6667f..bdfaf033a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -52,6 +52,15 @@
     private static final int DefaultExpectedItems = 256;
     private static final int DefaultConcurrencyLevel = 16;
 
+    /**
+     * Predicate specialization for (long, V) types.
+     *
+     * @param <V>
+     */
+    public interface LongObjectPredicate<V> {
+        boolean test(long key, V value);
+    }
+
     private final Section<V>[] sections;
 
     public ConcurrentLongHashMap() {
@@ -149,6 +158,17 @@ public boolean remove(long key, Object value) {
         return getSection(h).remove(key, value, (int) h) != null;
     }
 
+    public int removeIf(LongObjectPredicate<V> predicate) {
+        checkNotNull(predicate);
+
+        int removedCount = 0;
+        for (Section<V> s : sections) {
+            removedCount += s.removeIf(predicate);
+        }
+
+        return removedCount;
+    }
+
     private Section<V> getSection(long hash) {
         // Use 32 msb out of long to get the section
         final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
@@ -375,6 +395,40 @@ private V remove(long key, Object value, int keyHash) {
             }
         }
 
+        int removeIf(LongObjectPredicate<V> filter) {
+            long stamp = writeLock();
+
+            int removedCount = 0;
+            try {
+                // Go through all the buckets for this section
+                int capacity = this.capacity;
+                for (int bucket = 0; bucket < capacity; bucket++) {
+                    long storedKey = keys[bucket];
+                    V storedValue = values[bucket];
+
+                    if (storedValue != EmptyValue && storedValue != DeletedValue) {
+                        if (filter.test(storedKey, storedValue)) {
+                            // Removing item
+                            --size;
+                            ++removedCount;
+
+                            V nextValueInArray = values[signSafeMod(bucket + 1, capacity)];
+                            if (nextValueInArray == EmptyValue) {
+                                values[bucket] = (V) EmptyValue;
+                                --usedBuckets;
+                            } else {
+                                values[bucket] = (V) DeletedValue;
+                            }
+                        }
+                    }
+                }
+
+                return removedCount;
+            } finally {
+                unlockWrite(stamp);
+            }
+        }
+
         void clear() {
             long stamp = writeLock();
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index 702e24ca6..06c76673f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -116,6 +116,23 @@ public void testRemove() {
         assertTrue(map.isEmpty());
     }
 
+    @Test
+    public void testRemoveIf() {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);
+
+        map.put(1L, "one");
+        map.put(2L, "two");
+        map.put(3L, "three");
+        map.put(4L, "four");
+
+        map.removeIf((k, v) -> k < 3);
+        assertFalse(map.containsKey(1L));
+        assertFalse(map.containsKey(2L));
+        assertTrue(map.containsKey(3L));
+        assertTrue(map.containsKey(4L));
+        assertEquals(2, map.size());
+    }
+
     @Test
     public void testNegativeUsedBucketCount() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(16, 1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services