You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/09 01:28:04 UTC

[bookkeeper] 02/03: Refactor FileInfo locking and refcounting out IndexPersistenceMgr

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 43837f0d560f1918d724d8fd3a72eb4ed1663e26
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Jan 8 13:18:39 2018 -0800

    Refactor FileInfo locking and refcounting out IndexPersistenceMgr
    
    There is a number of bugs in index persistence mgr related to how
    refcounts are handled and how FileInfos are freed, due to the locking
    mechanism. These can result in the fencing bit being lost in ledgers,
    which is a serious issue. They also cause flakes in
    IndexPersistenceMgrTest#testEvictFileInfoWhenUnderlyingFileExists.
    
    For details see the discussion at:
    https://github.com/apache/bookkeeper/pull/513/files/8075b0#r156676238
    
    There are two key problems.
    
    1. FileInfos are flushed asynchronously on eviction. If another thread
    tries to read the FileInfo after the flush has been scheduled, but
    before it runs, it will read stale date (and we'll possibly lose the
    fence bit).
    2. FileInfos can be closed while it is still references in the
    IndexPersistenceMgr. This means that it can be fenced, but this will
    very be flushed to disk because the FileInfo is already closed.
    
    The patch solves this by moving FileInfo locking and refcount into a
    separate class, FileInfoBackingCache. When either the write cache or
    read cache load a file info, it tries to load it from this backing
    cache. All writes to the the backing cache are done under a write
    lock. This class also takes care of reference counting. It hands out
    CachedFileInfo objects, which will be flushed to disk when all
    references are released.
    
    Author: Ivan Kelly <iv...@apache.org>
    
    Reviewers: Yiming Zang <yz...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #913 from ivankelly/index-persist-lock
---
 .../org/apache/bookkeeper/bookie/FileInfo.java     |  69 ++---
 .../bookkeeper/bookie/FileInfoBackingCache.java    | 168 ++++++++++++
 .../bookkeeper/bookie/IndexPersistenceMgr.java     | 190 +++++---------
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java |  73 ++++--
 .../apache/bookkeeper/bookie/LedgerCacheTest.java  |  15 +-
 .../bookie/TestFileInfoBackingCache.java           | 288 +++++++++++++++++++++
 6 files changed, 603 insertions(+), 200 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index 38edbda..296acb9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -33,7 +33,6 @@ import java.io.RandomAccessFile;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.common.util.Watchable;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.slf4j.Logger;
@@ -79,7 +78,6 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
 
     static final long START_OF_DATA = 1024;
     private long size;
-    private AtomicInteger useCount = new AtomicInteger(0);
     private boolean isClosed;
     private long sizeSinceLastwrite;
 
@@ -139,6 +137,10 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
         return true;
     }
 
+    public boolean isClosed() {
+        return isClosed;
+    }
+
     public synchronized File getLf() {
         return lf;
     }
@@ -387,35 +389,26 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
      *          if set to false, the index is not forced to create.
      */
     public void close(boolean force) throws IOException {
-        boolean closing = false;
-        try {
-            boolean changed = false;
-            synchronized (this) {
-                if (isClosed) {
-                    return;
-                }
-                isClosed = true;
-                closing = true;
-                checkOpen(force, true);
-                // Any time when we force close a file, we should try to flush header.
-                // otherwise, we might lose fence bit.
-                if (force) {
-                    flushHeader();
-                }
-                changed = true;
-                if (useCount.get() == 0 && fc != null) {
-                    fc.close();
-                    fc = null;
-                }
+        boolean changed = false;
+        synchronized (this) {
+            if (isClosed) {
+                return;
             }
-            if (changed) {
-                notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
+            isClosed = true;
+            checkOpen(force, true);
+            // Any time when we force close a file, we should try to flush header.
+            // otherwise, we might lose fence bit.
+            if (force) {
+                flushHeader();
             }
-        } finally {
-            if (closing) {
-                // recycle this watchable after the FileInfo is closed.
-                recycle();
+            changed = true;
+            if (fc != null) {
+                fc.close();
             }
+            fc = null;
+        }
+        if (changed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
         }
     }
 
@@ -502,26 +495,6 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
         return masterKey;
     }
 
-    public void use() {
-        useCount.incrementAndGet();
-    }
-
-    @VisibleForTesting
-    int getUseCount() {
-        return useCount.get();
-    }
-
-    public synchronized void release() {
-        int count = useCount.decrementAndGet();
-        if (isClosed && (count == 0) && fc != null) {
-            try {
-                fc.close();
-            } catch (IOException e) {
-                LOG.error("Error closing file channel", e);
-            }
-        }
-    }
-
     public synchronized boolean delete() {
         return lf.delete();
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
new file mode 100644
index 0000000..8cdadc7
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfoBackingCache.java
@@ -0,0 +1,168 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+class FileInfoBackingCache {
+    static final int DEAD_REF = -0xdead;
+
+    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    final ConcurrentHashMap<Long, CachedFileInfo> fileInfos = new ConcurrentHashMap<>();
+    final FileLoader fileLoader;
+
+    FileInfoBackingCache(FileLoader fileLoader) {
+        this.fileLoader = fileLoader;
+    }
+
+    CachedFileInfo loadFileInfo(long ledgerId, byte[] masterKey) throws IOException {
+        lock.readLock().lock();
+        try {
+            CachedFileInfo fi = fileInfos.get(ledgerId);
+            if (fi != null) {
+                // tryRetain only fails if #markDead() has been called
+                // on fi. This is only called from within the write lock,
+                // and if it is called (and succeeds) the fi will have been
+                // removed from fileInfos at the same time, so we should not
+                // have been able to get a reference to it here.
+                // The caller of loadFileInfo owns the refence, and is
+                // responsible for calling the corresponding #release().
+                assert(fi.tryRetain());
+                return fi;
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        // else FileInfo not found, create it under write lock
+        lock.writeLock().lock();
+        try {
+            File backingFile = fileLoader.load(ledgerId, masterKey != null);
+            CachedFileInfo fi = new CachedFileInfo(ledgerId, backingFile, masterKey);
+            fileInfos.put(ledgerId, fi);
+
+            // see comment above for why we assert
+            assert(fi.tryRetain());
+            return fi;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private void releaseFileInfo(long ledgerId, CachedFileInfo fileInfo) {
+        lock.writeLock().lock();
+        try {
+            if (fileInfo.markDead()) {
+                fileInfo.close(true);
+                fileInfos.remove(ledgerId, fileInfo);
+            }
+        } catch (IOException ioe) {
+            log.error("Error evicting file info({}) for ledger {} from backing cache",
+                      fileInfo, ledgerId, ioe);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    void closeAllWithoutFlushing() throws IOException {
+        for (Map.Entry<Long, CachedFileInfo> entry : fileInfos.entrySet()) {
+            entry.getValue().close(false);
+        }
+    }
+
+    class CachedFileInfo extends FileInfo {
+        final long ledgerId;
+        final AtomicInteger refCount;
+
+        CachedFileInfo(long ledgerId, File lf, byte[] masterKey) throws IOException {
+            super(lf, masterKey);
+            this.ledgerId = ledgerId;
+            this.refCount = new AtomicInteger(0);
+        }
+
+        /**
+         * Mark this fileinfo as dead. We can only mark a fileinfo as
+         * dead if noone currently holds a reference to it.
+         *
+         * @return true if we marked as dead, false otherwise
+         */
+        private boolean markDead() {
+            return refCount.compareAndSet(0, DEAD_REF);
+        }
+
+        /**
+         * Attempt to retain the file info.
+         * When a client obtains a fileinfo from a container object,
+         * but that container object may release the fileinfo before
+         * the client has a chance to call retain. In this case, the
+         * file info could be released and the destroyed before we ever
+         * get a chance to use it.
+         *
+         * <p>tryRetain avoids this problem, by doing a compare-and-swap on
+         * the reference count. If the refCount is negative, it means that
+         * the fileinfo is being cleaned up, and this fileinfo object should
+         * not be used. This works in tandem with #markDead, which will only
+         * set the refCount to negative if noone currently has it retained
+         * (i.e. the refCount is 0).
+         *
+         * @return true if we managed to increment the refcount, false otherwise
+         */
+        boolean tryRetain() {
+            while (true) {
+                int count = refCount.get();
+                if (count < 0) {
+                    return false;
+                } else if (refCount.compareAndSet(count, count + 1)) {
+                    return true;
+                }
+            }
+        }
+
+        int getRefCount() {
+            return refCount.get();
+        }
+
+        void release() {
+            if (refCount.decrementAndGet() == 0) {
+                releaseFileInfo(ledgerId, this);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "CachedFileInfo(ledger=" + ledgerId
+                + ",refCount=" + refCount.get()
+                + ",closed=" + isClosed()
+                + ",id=" + System.identityHashCode(this) + ")";
+        }
+    }
+
+    interface FileLoader {
+        File load(long ledgerId, boolean createIfMissing) throws IOException;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 8598bc4..0945506 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -38,13 +38,10 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.common.util.Watcher;
@@ -80,15 +77,13 @@ public class IndexPersistenceMgr {
     }
 
     // use two separate cache for write and read
-    final Cache<Long, FileInfo> writeFileInfoCache;
-    final Cache<Long, FileInfo> readFileInfoCache;
+    final Cache<Long, CachedFileInfo> writeFileInfoCache;
+    final Cache<Long, CachedFileInfo> readFileInfoCache;
+    final FileInfoBackingCache fileInfoBackingCache;
+
     final int openFileLimit;
     final int pageSize;
     final int entriesPerPage;
-    // Lock
-    final ReentrantReadWriteLock fileInfoLock = new ReentrantReadWriteLock();
-    // ThreadPool
-    final ScheduledExecutorService evictionThreadPool = Executors.newSingleThreadScheduledExecutor();
 
     // Manage all active ledgers in LedgerManager
     // so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
@@ -117,7 +112,8 @@ public class IndexPersistenceMgr {
 
         // build the file info cache
         int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
-        RemovalListener<Long, FileInfo> fileInfoEvictionListener = this::handleLedgerEviction;
+        fileInfoBackingCache = new FileInfoBackingCache(this::createFileInfoBackingFile);
+        RemovalListener<Long, CachedFileInfo> fileInfoEvictionListener = this::handleLedgerEviction;
         writeFileInfoCache = buildCache(
             concurrencyLevel,
             conf.getFileInfoCacheInitialCapacity(),
@@ -158,12 +154,12 @@ public class IndexPersistenceMgr {
         });
     }
 
-    private static Cache<Long, FileInfo> buildCache(int concurrencyLevel,
-                                            int initialCapacity,
-                                            int maximumSize,
-                                            long expireAfterAccessSeconds,
-                                            RemovalListener<Long, FileInfo> removalListener) {
-        CacheBuilder<Long, FileInfo> builder = CacheBuilder.newBuilder()
+    private static Cache<Long, CachedFileInfo> buildCache(int concurrencyLevel,
+                                                          int initialCapacity,
+                                                          int maximumSize,
+                                                          long expireAfterAccessSeconds,
+                                                          RemovalListener<Long, CachedFileInfo> removalListener) {
+        CacheBuilder<Long, CachedFileInfo> builder = CacheBuilder.newBuilder()
             .concurrencyLevel(concurrencyLevel)
             .initialCapacity(initialCapacity)
             .maximumSize(maximumSize)
@@ -174,67 +170,34 @@ public class IndexPersistenceMgr {
         return builder.build();
     }
 
+    private File createFileInfoBackingFile(long ledger, boolean createIfMissing) throws IOException {
+        File lf = findIndexFile(ledger);
+        if (null == lf) {
+            if (!createIfMissing) {
+                throw new Bookie.NoLedgerException(ledger);
+            }
+            // We don't have a ledger index file on disk or in cache, so create it.
+            lf = getNewLedgerIndexFile(ledger, null);
+        }
+        return lf;
+    }
+
     /**
      * When a ledger is evicted, we need to make sure there's no other thread
      * trying to get FileInfo for that ledger at the same time when we close
      * the FileInfo.
      */
-    private void handleLedgerEviction(RemovalNotification<Long, FileInfo> notification) {
-        FileInfo fileInfo = notification.getValue();
-        Long ledgerId = notification.getKey();
+    private void handleLedgerEviction(RemovalNotification<Long, CachedFileInfo> notification) {
+        CachedFileInfo fileInfo = notification.getValue();
         if (null == fileInfo || null == notification.getKey()) {
             return;
         }
         if (notification.wasEvicted()) {
             evictedLedgersCounter.inc();
-            // we need to acquire the write lock in another thread,
-            // otherwise there could be dead lock happening.
-            evictionThreadPool.execute(() -> {
-                fileInfoLock.writeLock().lock();
-                try {
-                    // We only close the fileInfo when we evict the FileInfo from both cache
-                    if (!readFileInfoCache.asMap().containsKey(ledgerId)
-                            && !writeFileInfoCache.asMap().containsKey(ledgerId)) {
-                        fileInfo.close(true);
-                    }
-                } catch (IOException e) {
-                    LOG.error("Exception closing file info when ledger {} is evicted from file info cache.",
-                        ledgerId, e);
-                } finally {
-                    fileInfoLock.writeLock().unlock();
-                }
-            });
         }
         fileInfo.release();
     }
 
-    class FileInfoLoader implements Callable<FileInfo> {
-
-        final long ledger;
-        final byte[] masterKey;
-
-        FileInfoLoader(long ledger, byte[] masterKey) {
-            this.ledger = ledger;
-            this.masterKey = masterKey;
-        }
-
-        @Override
-        public FileInfo call() throws IOException {
-            File lf = findIndexFile(ledger);
-            if (null == lf) {
-                if (null == masterKey) {
-                    throw new Bookie.NoLedgerException(ledger);
-                }
-                // We don't have a ledger index file on disk or in cache, so create it.
-                lf = getNewLedgerIndexFile(ledger, null);
-                activeLedgers.put(ledger, true);
-            }
-            FileInfo fi = new FileInfo(lf, masterKey);
-            fi.use();
-            return fi;
-        }
-    }
-
     /**
      * Get the FileInfo and increase reference count.
      * When we get FileInfo from cache, we need to make sure it is synchronized
@@ -242,22 +205,23 @@ public class IndexPersistenceMgr {
      * the FileInfo from cache, that FileInfo is then evicted and closed before we
      * could even increase the reference counter.
      */
-    FileInfo getFileInfo(final Long ledger, final byte masterKey[]) throws IOException {
+    CachedFileInfo getFileInfo(final Long ledger, final byte masterKey[]) throws IOException {
         try {
-            FileInfo fi;
+            CachedFileInfo fi;
             pendingGetFileInfoCounter.inc();
-            fileInfoLock.readLock().lock();
-            if (null != masterKey) {
-                fi = writeFileInfoCache.get(ledger,
-                    new FileInfoLoader(ledger, masterKey));
-                if (null == readFileInfoCache.asMap().putIfAbsent(ledger, fi)) {
-                    fi.use();
+            Callable<CachedFileInfo> loader = () -> {
+                CachedFileInfo fileInfo = fileInfoBackingCache.loadFileInfo(ledger, masterKey);
+                activeLedgers.put(ledger, true);
+                return fileInfo;
+            };
+            do {
+                if (null != masterKey) {
+                    fi = writeFileInfoCache.get(ledger, loader);
+                } else {
+                    fi = readFileInfoCache.get(ledger, loader);
                 }
-            } else {
-                fi = readFileInfoCache.get(ledger,
-                    new FileInfoLoader(ledger, null));
-            }
-            fi.use();
+            } while (!fi.tryRetain());
+
             return fi;
         } catch (ExecutionException | UncheckedExecutionException ee) {
             if (ee.getCause() instanceof IOException) {
@@ -267,7 +231,6 @@ public class IndexPersistenceMgr {
             }
         } finally {
             pendingGetFileInfoCounter.dec();
-            fileInfoLock.readLock().unlock();
         }
     }
 
@@ -354,8 +317,7 @@ public class IndexPersistenceMgr {
      */
     void removeLedger(Long ledgerId) throws IOException {
         // Delete the ledger's index file and close the FileInfo
-        FileInfo fi = null;
-        fileInfoLock.writeLock().lock();
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             // Don't force flush. There's no need since we're deleting the ledger
@@ -364,18 +326,14 @@ public class IndexPersistenceMgr {
             fi.close(false);
             fi.delete();
         } finally {
-            try {
-                if (fi != null) {
-                    // should release use count
-                    fi.release();
-                    // Remove it from the active ledger manager
-                    activeLedgers.remove(ledgerId);
-                    // Now remove it from cache
-                    writeFileInfoCache.invalidate(ledgerId);
-                    readFileInfoCache.invalidate(ledgerId);
-                }
-            } finally {
-                fileInfoLock.writeLock().unlock();
+            if (fi != null) {
+                // should release use count
+                fi.release();
+                // Remove it from the active ledger manager
+                activeLedgers.remove(ledgerId);
+                // Now remove it from cache
+                writeFileInfoCache.invalidate(ledgerId);
+                readFileInfoCache.invalidate(ledgerId);
             }
         }
     }
@@ -399,29 +357,13 @@ public class IndexPersistenceMgr {
         // Don't force create the file. We may have many dirty ledgers and file create/flush
         // can be quite expensive as a result. We can use this optimization in this case
         // because metadata will be recovered from the journal when we restart anyway.
-        try {
-            fileInfoLock.writeLock().lock();
-            for (Map.Entry<Long, FileInfo> entry : writeFileInfoCache.asMap().entrySet()) {
-                entry.getValue().close(false);
-            }
-            for (Map.Entry<Long, FileInfo> entry : readFileInfoCache.asMap().entrySet()) {
-                entry.getValue().close(false);
-            }
-            writeFileInfoCache.invalidateAll();
-            readFileInfoCache.invalidateAll();
-        } finally {
-            fileInfoLock.writeLock().unlock();
-        }
-        evictionThreadPool.shutdown();
-        try {
-            evictionThreadPool.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            //ignore
-        }
+        fileInfoBackingCache.closeAllWithoutFlushing();
+        writeFileInfoCache.invalidateAll();
+        readFileInfoCache.invalidateAll();
     }
 
     Long getLastAddConfirmed(long ledgerId) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.getLastAddConfirmed();
@@ -435,7 +377,7 @@ public class IndexPersistenceMgr {
     boolean waitForLastAddConfirmedUpdate(long ledgerId,
                                           long previousLAC,
                                           Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher);
@@ -447,7 +389,7 @@ public class IndexPersistenceMgr {
     }
 
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.setLastAddConfirmed(lac);
@@ -459,7 +401,7 @@ public class IndexPersistenceMgr {
     }
 
     byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.getMasterKey();
@@ -471,7 +413,7 @@ public class IndexPersistenceMgr {
     }
 
     void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, masterKey);
         } finally {
@@ -482,7 +424,7 @@ public class IndexPersistenceMgr {
     }
 
     boolean setFenced(long ledgerId) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.setFenced();
@@ -494,7 +436,7 @@ public class IndexPersistenceMgr {
     }
 
     boolean isFenced(long ledgerId) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.isFenced();
@@ -506,7 +448,7 @@ public class IndexPersistenceMgr {
     }
 
     void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             fi.setExplicitLac(lac);
@@ -519,7 +461,7 @@ public class IndexPersistenceMgr {
     }
 
     public ByteBuf getExplicitLac(long ledgerId) {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
             return fi.getExplicitLac();
@@ -600,7 +542,7 @@ public class IndexPersistenceMgr {
     }
 
     void flushLedgerHeader(long ledger) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(ledger, null);
             relocateIndexFileAndFlushHeader(ledger, fi);
@@ -616,7 +558,7 @@ public class IndexPersistenceMgr {
     }
 
     void flushLedgerEntries(long l, List<LedgerEntryPage> entries) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             Collections.sort(entries, new Comparator<LedgerEntryPage>() {
                 @Override
@@ -711,7 +653,7 @@ public class IndexPersistenceMgr {
         if (!lep.isClean()) {
             throw new IOException("Trying to update a dirty page");
         }
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         try {
             fi = getFileInfo(lep.getLedger(), null);
             long pos = lep.getFirstEntryPosition();
@@ -730,7 +672,7 @@ public class IndexPersistenceMgr {
     }
 
     long getPersistEntryBeyondInMem(long ledgerId, long lastEntryInMem) throws IOException {
-        FileInfo fi = null;
+        CachedFileInfo fi = null;
         long lastEntry = lastEntryInMem;
         try {
             fi = getFileInfo(ledgerId, null);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index a179745..86c3830 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -151,12 +152,12 @@ public class IndexPersistenceMgrTest {
             assertEquals(0, indexPersistenceMgr.writeFileInfoCache.size());
             assertEquals(0, indexPersistenceMgr.readFileInfoCache.size());
 
-            FileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
-            assertEquals(3, writeFileInfo.getUseCount());
+            CachedFileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
+            assertEquals(2, writeFileInfo.getRefCount());
             assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
-            assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
+            assertEquals(0, indexPersistenceMgr.readFileInfoCache.size());
             writeFileInfo.release();
-            assertEquals(2, writeFileInfo.getUseCount());
+            assertEquals(1, writeFileInfo.getRefCount());
         } finally {
             if (null != indexPersistenceMgr) {
                 indexPersistenceMgr.close();
@@ -170,19 +171,19 @@ public class IndexPersistenceMgrTest {
         try {
             indexPersistenceMgr = createIndexPersistenceManager(1);
 
-            FileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
-            assertEquals(3, writeFileInfo.getUseCount());
+            CachedFileInfo writeFileInfo = indexPersistenceMgr.getFileInfo(lid, masterKey);
+            assertEquals(2, writeFileInfo.getRefCount());
             assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
-            assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
+            assertEquals(0, indexPersistenceMgr.readFileInfoCache.size());
             writeFileInfo.release();
 
-            FileInfo readFileInfo = indexPersistenceMgr.getFileInfo(lid, null);
-            assertEquals(3, readFileInfo.getUseCount());
+            CachedFileInfo readFileInfo = indexPersistenceMgr.getFileInfo(lid, null);
+            assertEquals(3, readFileInfo.getRefCount());
             assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
             assertEquals(1, indexPersistenceMgr.readFileInfoCache.size());
             readFileInfo.release();
-            assertEquals(2, writeFileInfo.getUseCount());
-            assertEquals(2, readFileInfo.getUseCount());
+            assertEquals(2, writeFileInfo.getRefCount());
+            assertEquals(2, readFileInfo.getRefCount());
         } finally {
             if (null != indexPersistenceMgr) {
                 indexPersistenceMgr.close();
@@ -196,9 +197,13 @@ public class IndexPersistenceMgrTest {
         try {
             indexPersistenceMgr = createIndexPersistenceManager(1);
             for (int i = 0; i < 3; i++) {
-                FileInfo fileInfo = indexPersistenceMgr.getFileInfo(lid+i, masterKey);
+                CachedFileInfo fileInfo = indexPersistenceMgr.getFileInfo(lid + i, masterKey);
                 // We need to make sure index file is created, otherwise the test case can be flaky
                 fileInfo.checkOpen(true);
+                fileInfo.release();
+
+                // load into read cache also
+                indexPersistenceMgr.getFileInfo(lid + i, null).release();
             }
 
             indexPersistenceMgr.getFileInfo(lid, masterKey);
@@ -212,10 +217,10 @@ public class IndexPersistenceMgrTest {
             assertEquals(1, indexPersistenceMgr.writeFileInfoCache.size());
             assertEquals(2, indexPersistenceMgr.readFileInfoCache.size());
 
-            FileInfo fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid);
+            CachedFileInfo fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid);
             assertNotNull(fileInfo);
-            assertEquals(2, fileInfo.getUseCount());
-            fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid+1);
+            assertEquals(2, fileInfo.getRefCount());
+            fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid + 1);
             assertNull(fileInfo);
             fileInfo = indexPersistenceMgr.writeFileInfoCache.asMap().get(lid+2);
             assertNull(fileInfo);
@@ -223,10 +228,10 @@ public class IndexPersistenceMgrTest {
             assertNull(fileInfo);
             fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid+1);
             assertNotNull(fileInfo);
-            assertEquals(2, fileInfo.getUseCount());
-            fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid+2);
+            assertEquals(2, fileInfo.getRefCount());
+            fileInfo = indexPersistenceMgr.readFileInfoCache.asMap().get(lid + 2);
             assertNotNull(fileInfo);
-            assertEquals(2, fileInfo.getUseCount());
+            assertEquals(2, fileInfo.getRefCount());
         } finally {
             if (null != indexPersistenceMgr) {
                 indexPersistenceMgr.close();
@@ -264,4 +269,36 @@ public class IndexPersistenceMgrTest {
             }
         }
     }
+
+    @Test
+    public void testEvictBeforeReleaseRace() throws Exception {
+        IndexPersistenceMgr indexPersistenceMgr = null;
+        Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle();
+        try {
+            indexPersistenceMgr = createIndexPersistenceManager(1);
+
+            indexPersistenceMgr.getFileInfo(1L, masterKey);
+            indexPersistenceMgr.getFileInfo(2L, masterKey);
+            indexPersistenceMgr.getFileInfo(3L, masterKey);
+            indexPersistenceMgr.getFileInfo(4L, masterKey);
+
+            CachedFileInfo fi = indexPersistenceMgr.getFileInfo(1L, masterKey);
+
+            // trigger eviction
+            indexPersistenceMgr.getFileInfo(2L, masterKey);
+            indexPersistenceMgr.getFileInfo(3L, null);
+            indexPersistenceMgr.getFileInfo(4L, null);
+
+            Thread.sleep(1000);
+
+            fi.setFenced();
+            fi.release();
+
+            assertTrue(indexPersistenceMgr.isFenced(1));
+        } finally {
+            if (null != indexPersistenceMgr) {
+                indexPersistenceMgr.close();
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 8de2bfc..43b0a86 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -279,12 +280,8 @@ public class LedgerCacheTest {
         // Create ledger index file
         ledgerStorage.setMasterKey(1, "key".getBytes());
 
-        FileInfo fileInfo = ledgerCache.getIndexPersistenceManager().getFileInfo(Long.valueOf(1), null);
+        CachedFileInfo fileInfo = ledgerCache.getIndexPersistenceManager().getFileInfo(Long.valueOf(1), null);
 
-        // Simulate the flush failure
-        FileInfo newFileInfo = new FileInfo(fileInfo.getLf(), fileInfo.getMasterKey());
-        ledgerCache.getIndexPersistenceManager().writeFileInfoCache.put(Long.valueOf(1), newFileInfo);
-        ledgerCache.getIndexPersistenceManager().readFileInfoCache.put(Long.valueOf(1), newFileInfo);
         // Add entries
         ledgerStorage.addEntry(generateEntry(1, 1));
         ledgerStorage.addEntry(generateEntry(1, 2));
@@ -293,13 +290,11 @@ public class LedgerCacheTest {
         ledgerStorage.addEntry(generateEntry(1, 3));
         // add the dir to failed dirs
         bookie.getIndexDirsManager().addToFilledDirs(
-                newFileInfo.getLf().getParentFile().getParentFile().getParentFile());
-        File before = newFileInfo.getLf();
+                fileInfo.getLf().getParentFile().getParentFile().getParentFile());
+        File before = fileInfo.getLf();
         // flush after disk is added as failed.
         ledgerStorage.flush();
-        File after = newFileInfo.getLf();
-
-        assertEquals("Reference counting for the file info should be zero.", 0, newFileInfo.getUseCount());
+        File after = fileInfo.getLf();
 
         assertFalse("After flush index file should be changed", before.equals(after));
         // Verify written entries
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
new file mode 100644
index 0000000..489db74
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestFileInfoBackingCache.java
@@ -0,0 +1,288 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for FileInfoBackingCache.
+ */
+@Slf4j
+public class TestFileInfoBackingCache {
+    final byte[] masterKey = new byte[0];
+    final File baseDir;
+    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("backing-cache-test-%d").setDaemon(true).build();
+    ExecutorService executor;
+
+    public TestFileInfoBackingCache() throws Exception {
+        baseDir = File.createTempFile("foo", "bar");
+    }
+
+    @Before
+    public void setup() throws Exception {
+        Assert.assertTrue(baseDir.delete());
+        Assert.assertTrue(baseDir.mkdirs());
+        baseDir.deleteOnExit();
+
+        executor = Executors.newCachedThreadPool(threadFactory);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (executor != null) {
+            executor.shutdown();
+        }
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        FileInfoBackingCache cache = new FileInfoBackingCache(
+                (ledgerId, createIfNotFound) -> {
+                    File f = new File(baseDir, String.valueOf(ledgerId));
+                    f.deleteOnExit();
+                    return f;
+                });
+        CachedFileInfo fi = cache.loadFileInfo(1, masterKey);
+        Assert.assertEquals(fi.getRefCount(), 1);
+        CachedFileInfo fi2 = cache.loadFileInfo(2, masterKey);
+        Assert.assertEquals(fi2.getRefCount(), 1);
+        CachedFileInfo fi3 = cache.loadFileInfo(1, null);
+        Assert.assertEquals(fi, fi3);
+        Assert.assertEquals(fi3.getRefCount(), 2);
+
+        // check that it expires correctly
+        fi.release();
+        fi3.release();
+
+        Assert.assertEquals(fi.getRefCount(), FileInfoBackingCache.DEAD_REF);
+        CachedFileInfo fi4 = cache.loadFileInfo(1, null);
+        Assert.assertFalse(fi4 == fi);
+        Assert.assertEquals(fi.getRefCount(), FileInfoBackingCache.DEAD_REF);
+        Assert.assertEquals(fi4.getRefCount(), 1);
+        Assert.assertEquals(fi.getLf(), fi4.getLf());
+    }
+
+    @Test(expected = IOException.class)
+    public void testNoKey() throws Exception {
+        FileInfoBackingCache cache = new FileInfoBackingCache(
+                (ledgerId, createIfNotFound) -> {
+                    Assert.assertFalse(createIfNotFound);
+                    throw new Bookie.NoLedgerException(ledgerId);
+                });
+        cache.loadFileInfo(1, null);
+    }
+
+    /**
+     * Of course this can't prove they don't exist, but
+     * try to shake them out none the less.
+     */
+    @Test
+    public void testForDeadlocks() throws Exception {
+        int numRunners = 20;
+        int maxLedgerId = 10;
+        AtomicBoolean done = new AtomicBoolean(false);
+
+        FileInfoBackingCache cache = new FileInfoBackingCache(
+                (ledgerId, createIfNotFound) -> {
+                    File f = new File(baseDir, String.valueOf(ledgerId));
+                    f.deleteOnExit();
+                    return f;
+                });
+        Iterable<Future<Set<CachedFileInfo>>> futures =
+            IntStream.range(0, numRunners).mapToObj(
+                    (i) -> {
+                        Callable<Set<CachedFileInfo>> c = () -> {
+                            Random r = new Random();
+                            List<CachedFileInfo> fileInfos = new ArrayList<>();
+                            Set<CachedFileInfo> allFileInfos = new HashSet<>();
+                            while (!done.get()) {
+                                if (r.nextBoolean() && fileInfos.size() < 5) { // take a reference
+                                    CachedFileInfo fi = cache.loadFileInfo(r.nextInt(maxLedgerId), masterKey);
+                                    Assert.assertFalse(fi.isClosed());
+                                    allFileInfos.add(fi);
+                                    fileInfos.add(fi);
+                                } else { // release a reference
+                                    Collections.shuffle(fileInfos);
+                                    if (!fileInfos.isEmpty()) {
+                                        fileInfos.remove(0).release();
+                                    }
+                                }
+                            }
+                            for (CachedFileInfo fi : fileInfos) {
+                                Assert.assertFalse(fi.isClosed());
+                                fi.release();
+                            }
+                            return allFileInfos;
+                        };
+                        return executor.submit(c);
+                    }).collect(Collectors.toList());
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        done.set(true);
+
+        // ensure all threads are finished operating on cache, before checking any
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            f.get();
+        }
+
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            for (CachedFileInfo fi : f.get()) {
+                Assert.assertTrue(fi.isClosed());
+                Assert.assertEquals(FileInfoBackingCache.DEAD_REF, fi.getRefCount());
+            }
+        }
+
+        // try to load all ledgers again.
+        // They should be loaded fresh (i.e. this load should be only reference)
+        for (int i = 0; i < maxLedgerId; i++) {
+            Assert.assertEquals(1, cache.loadFileInfo(i, masterKey).getRefCount());
+        }
+    }
+
+    @Test
+    public void testRefCountRace() throws Exception {
+        AtomicBoolean done = new AtomicBoolean(false);
+        FileInfoBackingCache cache = new FileInfoBackingCache(
+                (ledgerId, createIfNotFound) -> {
+                    File f = new File(baseDir, String.valueOf(ledgerId));
+                    f.deleteOnExit();
+                    return f;
+                });
+
+        Iterable<Future<Set<CachedFileInfo>>> futures =
+            IntStream.range(0, 2).mapToObj(
+                    (i) -> {
+                        Callable<Set<CachedFileInfo>> c = () -> {
+                            Set<CachedFileInfo> allFileInfos = new HashSet<>();
+                            while (!done.get()) {
+                                CachedFileInfo fi = cache.loadFileInfo(1, masterKey);
+                                Assert.assertFalse(fi.isClosed());
+                                allFileInfos.add(fi);
+                                fi.release();
+                            }
+                            return allFileInfos;
+                        };
+                        return executor.submit(c);
+                    }).collect(Collectors.toList());
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        done.set(true);
+
+        // ensure all threads are finished operating on cache, before checking any
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            f.get();
+        }
+
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            for (CachedFileInfo fi : f.get()) {
+                Assert.assertTrue(fi.isClosed());
+                Assert.assertEquals(FileInfoBackingCache.DEAD_REF, fi.getRefCount());
+            }
+        }
+    }
+
+    private void guavaEvictionListener(RemovalNotification<Long, CachedFileInfo> notification) {
+        notification.getValue().release();
+    }
+
+    @Test
+    public void testRaceGuavaEvictAndReleaseBeforeRetain() throws Exception {
+        AtomicBoolean done = new AtomicBoolean(false);
+        FileInfoBackingCache cache = new FileInfoBackingCache(
+                (ledgerId, createIfNotFound) -> {
+                    File f = new File(baseDir, String.valueOf(ledgerId));
+                    f.deleteOnExit();
+                    return f;
+                });
+
+        Cache<Long, CachedFileInfo> guavaCache = CacheBuilder.newBuilder()
+            .maximumSize(1)
+            .removalListener(this::guavaEvictionListener)
+            .build();
+
+        Iterable<Future<Set<CachedFileInfo>>> futures =
+            LongStream.range(0L, 2L).mapToObj(
+                    (i) -> {
+                        Callable<Set<CachedFileInfo>> c = () -> {
+                            Set<CachedFileInfo> allFileInfos = new HashSet<>();
+                            while (!done.get()) {
+                                CachedFileInfo fi = null;
+
+                                do {
+                                    fi = guavaCache.get(
+                                            i, () -> cache.loadFileInfo(i, masterKey));
+                                    allFileInfos.add(fi);
+                                    Thread.sleep(100);
+                                } while (!fi.tryRetain());
+
+                                Assert.assertFalse(fi.isClosed());
+                                fi.release();
+                            }
+                            return allFileInfos;
+                        };
+                        return executor.submit(c);
+                    }).collect(Collectors.toList());
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+        done.set(true);
+
+        // ensure all threads are finished operating on cache, before checking any
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            f.get();
+        }
+        guavaCache.invalidateAll();
+
+        for (Future<Set<CachedFileInfo>> f : futures) {
+            for (CachedFileInfo fi : f.get()) {
+                Assert.assertTrue(fi.isClosed());
+                Assert.assertEquals(FileInfoBackingCache.DEAD_REF, fi.getRefCount());
+            }
+        }
+
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>.