You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/05/28 02:10:40 UTC

[hbase] branch HBASE-21879 updated: HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-21879 by this push:
     new b673000  HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)
b673000 is described below

commit b673000a5ca3387dd034657d7f4a24b60d3835d4
Author: openinx <op...@gmail.com>
AuthorDate: Tue May 28 10:10:34 2019 +0800

    HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)
---
 .../hadoop/hbase/io/hfile/BlockCacheUtil.java      |   3 +
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  40 ++++---
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java     |  26 ++---
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |  21 ++--
 .../hadoop/hbase/io/hfile/LruBlockCache.java       |  23 ++--
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  29 +++--
 .../hbase/io/hfile/TestCombinedBlockCache.java     |  17 +++
 .../hadoop/hbase/io/hfile/TestLruBlockCache.java   |  83 +++++++++++++-
 .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java | 126 +++++++++++++++++++++
 9 files changed, 306 insertions(+), 62 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 46e8e24..2672992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -230,6 +230,9 @@ public class BlockCacheUtil {
       BlockCacheKey cacheKey, Cacheable newBlock) {
     // NOTICE: The getBlock has retained the existingBlock inside.
     Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
+    if (existingBlock == null) {
+      return true;
+    }
     try {
       int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
       if (comparison < 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 846460f..079907e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -677,18 +679,24 @@ public class HFileBlock implements Cacheable {
 
     HFileBlock unpacked = new HFileBlock(this);
     unpacked.allocateBuffer(); // allocates space for the decompressed block
-
-    HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
-        ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
-
-    ByteBuff dup = this.buf.duplicate();
-    dup.position(this.headerSize());
-    dup = dup.slice();
-
-    ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
-      unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
-
-    return unpacked;
+    boolean succ = false;
+    try {
+      HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
+          ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
+      // Create a duplicated buffer without the header part.
+      ByteBuff dup = this.buf.duplicate();
+      dup.position(this.headerSize());
+      dup = dup.slice();
+      // Decode the dup into unpacked#buf
+      ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
+        unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
+      succ = true;
+      return unpacked;
+    } finally {
+      if (!succ) {
+        unpacked.release();
+      }
+    }
   }
 
   /**
@@ -709,7 +717,7 @@ public class HFileBlock implements Cacheable {
 
     buf = newBuf;
     // set limit to exclude next block's header
-    buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
+    buf.limit(capacityNeeded);
   }
 
   /**
@@ -1685,7 +1693,7 @@ public class HFileBlock implements Cacheable {
     }
 
     private ByteBuff allocate(int size, boolean intoHeap) {
-      return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
+      return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
     }
 
     /**
@@ -1735,7 +1743,7 @@ public class HFileBlock implements Cacheable {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Extra see to get block size!", new RuntimeException());
           }
-          headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
+          headerBuf = HEAP.allocate(hdrSize);
           readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
           headerBuf.rewind();
         }
@@ -1778,7 +1786,7 @@ public class HFileBlock implements Cacheable {
         // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
         // contains the header of next block, so no need to set next block's header in it.
         HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
-            offset, nextBlockOnDiskSize, fileContext, allocator);
+            offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator);
         // Run check on uncompressed sizings.
         if (!fileContext.isCompressedOrEncrypted()) {
           hFileBlock.sanityCheckUncompressed();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index ad61839..8396192 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -313,10 +313,13 @@ public class HFileBlockIndex {
       int index = -1;
 
       HFileBlock block = null;
-      boolean dataBlock = false;
       KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
       while (true) {
         try {
+          // Must initialize it with null here, because if don't and once an exception happen in
+          // readBlock, then we'll release the previous assigned block twice in the finally block.
+          // (See HBASE-22422)
+          block = null;
           if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
             // Avoid reading the same block again, even with caching turned off.
             // This is crucial for compaction-type workload which might have
@@ -336,9 +339,8 @@ public class HFileBlockIndex {
               // this also accounts for ENCODED_DATA
               expectedBlockType = BlockType.DATA;
             }
-            block =
-                cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
-                  isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
+            block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
+              pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
           }
 
           if (block == null) {
@@ -348,7 +350,6 @@ public class HFileBlockIndex {
 
           // Found a data block, break the loop and check our level in the tree.
           if (block.getBlockType().isData()) {
-            dataBlock = true;
             break;
           }
 
@@ -381,7 +382,7 @@ public class HFileBlockIndex {
             nextIndexedKey = tmpNextIndexKV;
           }
         } finally {
-          if (!dataBlock && block != null) {
+          if (block != null && !block.getBlockType().isData()) {
             // Release the block immediately if it is not the data block
             block.release();
           }
@@ -389,7 +390,7 @@ public class HFileBlockIndex {
       }
 
       if (lookupLevel != searchTreeLevel) {
-        assert dataBlock == true;
+        assert block.getBlockType().isData();
         // Though we have retrieved a data block we have found an issue
         // in the retrieved data block. Hence returned the block so that
         // the ref count can be decremented
@@ -401,8 +402,7 @@ public class HFileBlockIndex {
       }
 
       // set the next indexed key for the current block.
-      BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
-      return blockWithScanInfo;
+      return new BlockWithScanInfo(block, nextIndexedKey);
     }
 
     @Override
@@ -576,8 +576,7 @@ public class HFileBlockIndex {
         boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
         throws IOException {
       BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
-          cacheBlocks,
-          pread, isCompaction, expectedDataBlockEncoding);
+        cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
       if (blockWithScanInfo == null) {
         return null;
       } else {
@@ -600,9 +599,8 @@ public class HFileBlockIndex {
      * @throws IOException
      */
     public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
-        boolean cacheBlocks,
-        boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
-        throws IOException;
+        boolean cacheBlocks, boolean pread, boolean isCompaction,
+        DataBlockEncoding expectedDataBlockEncoding) throws IOException;
 
     /**
      * An approximation to the {@link HFile}'s mid-key. Operates on block
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 38631c0..701c8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1128,15 +1128,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
       updateCurrentBlock(newBlock);
     }
 
-    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
-        boolean rewind, Cell key, boolean seekBefore) throws IOException {
-      if (this.curBlock == null
-          || this.curBlock.getOffset() != seekToBlock.getOffset()) {
+    protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind,
+        Cell key, boolean seekBefore) throws IOException {
+      if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) {
         updateCurrentBlock(seekToBlock);
       } else if (rewind) {
         blockBuffer.rewind();
       }
-
       // Update the nextIndexedKey
       this.nextIndexedKey = nextIndexedKey;
       return blockSeek(key, seekBefore);
@@ -1473,9 +1471,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
               // Validate encoding type for data blocks. We include encoding
               // type in the cache key, and we expect it to match on a cache hit.
               if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
+                // Remember to release the block when in exceptional path.
+                cachedBlock.release();
                 throw new IOException("Cached block under key " + cacheKey + " "
-                  + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
-                  + dataBlockEncoder.getDataBlockEncoding() + ")");
+                    + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+                    + dataBlockEncoder.getDataBlockEncoding() + ")");
               }
             }
             // Cache-hit. Return!
@@ -1499,15 +1499,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
 
         // Cache the block if necessary
-        AtomicBoolean cachedRaw = new AtomicBoolean(false);
         cacheConf.getBlockCache().ifPresent(cache -> {
           if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-            cachedRaw.set(cacheConf.shouldCacheCompressed(category));
-            cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked,
+            cache.cacheBlock(cacheKey,
+              cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
               cacheConf.isInMemory());
           }
         });
-        if (unpacked != hfileBlock && !cachedRaw.get()) {
+        if (unpacked != hfileBlock) {
           // End of life here if hfileBlock is an independent block.
           hfileBlock.release();
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 724685a..ec6db31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -504,7 +504,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
   @Override
   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
       boolean updateCacheMetrics) {
-    LruCachedBlock cb = map.get(cacheKey);
+    LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
+      // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
+      // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
+      // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
+      // see HBASE-22422.
+      val.getBuffer().retain();
+      return val;
+    });
     if (cb == null) {
       if (!repeat && updateCacheMetrics) {
         stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@@ -532,10 +539,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
       }
       return null;
     }
-    if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+    if (updateCacheMetrics) {
+      stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+    }
     cb.access(count.incrementAndGet());
-    // It will be referenced by RPC path, so increase here.
-    cb.getBuffer().retain();
     return cb.getBuffer();
   }
 
@@ -592,8 +599,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
     if (previous == null) {
       return 0;
     }
-    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate.
-    previous.getBuffer().release();
     updateSizeMetrics(block, true);
     long val = elements.decrementAndGet();
     if (LOG.isTraceEnabled()) {
@@ -601,7 +606,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
       assertCounterSanity(size, val);
     }
     if (block.getBuffer().getBlockType().isData()) {
-       dataBlockElements.decrement();
+      dataBlockElements.decrement();
     }
     if (evictedByEvictionProcess) {
       // When the eviction of the block happened because of invalidation of HFiles, no need to
@@ -611,6 +616,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
       }
     }
+    // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
+    // NOT move this up because if do that then the victimHandler may access the buffer with
+    // refCnt = 0 which is disallowed.
+    previous.getBuffer().release();
     return block.heapSize();
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index a330565..6d990d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
@@ -1533,21 +1534,28 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     public RAMQueueEntry get(BlockCacheKey key) {
-      RAMQueueEntry re = delegate.get(key);
-      if (re != null) {
-        // It'll be referenced by RPC, so retain here.
+      return delegate.computeIfPresent(key, (k, re) -> {
+        // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
+        // atomic, another thread may remove and release the block, when retaining in this thread we
+        // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
         re.getData().retain();
-      }
-      return re;
+        return re;
+      });
     }
 
+    /**
+     * Return the previous associated value, or null if absent. It has the same meaning as
+     * {@link ConcurrentMap#putIfAbsent(Object, Object)}
+     */
     public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
-      RAMQueueEntry previous = delegate.putIfAbsent(key, entry);
-      if (previous == null) {
+      AtomicBoolean absent = new AtomicBoolean(false);
+      RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
         // The RAMCache reference to this entry, so reference count should be increment.
         entry.getData().retain();
-      }
-      return previous;
+        absent.set(true);
+        return entry;
+      });
+      return absent.get() ? null : re;
     }
 
     public boolean remove(BlockCacheKey key) {
@@ -1576,8 +1584,9 @@ public class BucketCache implements BlockCache, HeapSize {
     public void clear() {
       Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
       while (it.hasNext()) {
-        it.next().getValue().getData().release();
+        RAMQueueEntry re = it.next().getValue();
         it.remove();
+        re.getData().release();
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
index f4dc38a..a086a3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -33,6 +38,8 @@ public class TestCombinedBlockCache {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestCombinedBlockCache.class);
 
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
   @Test
   public void testCombinedCacheStats() {
     CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2);
@@ -102,4 +109,14 @@ public class TestCombinedBlockCache {
     assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta);
     assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta);
   }
+
+  @Test
+  public void testMultiThreadGetAndEvictBlock() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(conf);
+    Assert.assertTrue(blockCache instanceof CombinedBlockCache);
+    TestLruBlockCache.testMultiThreadGetAndEvictBlockInternal(blockCache);
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
index 3317a4d..a355ab0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -27,6 +28,7 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -34,15 +36,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the concurrent LruBlockCache.<p>
@@ -58,6 +62,8 @@ public class TestLruBlockCache {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestLruBlockCache.class);
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestLruBlockCache.class);
+
   @Test
   public void testCacheEvictionThreadSafe() throws Exception {
     long maxSize = 100000;
@@ -814,11 +820,10 @@ public class TestLruBlockCache {
     byte[] byteArr = new byte[length];
     ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
     HFileContext meta = new HFileContextBuilder().build();
-    ByteBuffAllocator alloc = ByteBuffAllocator.HEAP;
     HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
+        HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
     HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
+        HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
 
     LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
         (int)Math.ceil(1.2*maxSize/blockSize),
@@ -958,5 +963,75 @@ public class TestLruBlockCache {
 
   }
 
+  static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
+    int size = 100;
+    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+    byte[] byteArr = new byte[length];
+    HFileContext meta = new HFileContextBuilder().build();
+    BlockCacheKey key = new BlockCacheKey("key1", 0);
+    HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
+    AtomicBoolean err1 = new AtomicBoolean(false);
+    Thread t1 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err1.get(); i++) {
+        try {
+          cache.getBlock(key, false, false, true);
+        } catch (Exception e) {
+          err1.set(true);
+          LOG.info("Cache block or get block failure: ", e);
+        }
+      }
+    });
+
+    AtomicBoolean err2 = new AtomicBoolean(false);
+    Thread t2 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err2.get(); i++) {
+        try {
+          cache.evictBlock(key);
+        } catch (Exception e) {
+          err2.set(true);
+          LOG.info("Evict block failure: ", e);
+        }
+      }
+    });
+
+    AtomicBoolean err3 = new AtomicBoolean(false);
+    Thread t3 = new Thread(() -> {
+      for (int i = 0; i < 10000 && !err3.get(); i++) {
+        try {
+          cache.cacheBlock(key, blk);
+        } catch (Exception e) {
+          err3.set(true);
+          LOG.info("Cache block failure: ", e);
+        }
+      }
+    });
+    t1.start();
+    t2.start();
+    t3.start();
+    t1.join();
+    t2.join();
+    t3.join();
+    Assert.assertFalse(err1.get());
+    Assert.assertFalse(err2.get());
+    Assert.assertFalse(err3.get());
+  }
+
+  @Test
+  public void testMultiThreadGetAndEvictBlock() throws Exception {
+    long maxSize = 100000;
+    long blockSize = calculateBlockSize(maxSize, 10);
+    LruBlockCache cache =
+        new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
+            LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+            0.66f, // min
+            0.99f, // acceptable
+            0.33f, // single
+            0.33f, // multi
+            0.34f, // memory
+            1.2f, // limit
+            false, 1024);
+    testMultiThreadGetAndEvictBlockInternal(cache);
+  }
 }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
new file mode 100644
index 0000000..5c5dda6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestRAMCache {
+  private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRAMCache.class);
+
+  // Define a mock HFileBlock.
+  private static class MockHFileBlock extends HFileBlock {
+
+    private volatile CountDownLatch latch;
+
+    MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+        int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
+        long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
+        HFileContext fileContext, ByteBuffAllocator allocator) {
+      super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
+          fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
+          allocator);
+    }
+
+    public void setLatch(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public MockHFileBlock retain() {
+      try {
+        if (latch != null) {
+          latch.await();
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted exception error: ", e);
+      }
+      super.retain();
+      return this;
+    }
+  }
+
+  @Test
+  public void testAtomicRAMCache() throws Exception {
+    int size = 100;
+    int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+    byte[] byteArr = new byte[length];
+
+    RAMCache cache = new RAMCache();
+    BlockCacheKey key = new BlockCacheKey("file-1", 1);
+    MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
+        ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
+        new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
+    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
+
+    Assert.assertNull(cache.putIfAbsent(key, re));
+    Assert.assertEquals(cache.putIfAbsent(key, re), re);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    blk.setLatch(latch);
+
+    AtomicBoolean error = new AtomicBoolean(false);
+    Thread t1 = new Thread(() -> {
+      try {
+        cache.get(key);
+      } catch (Exception e) {
+        error.set(true);
+      }
+    });
+    t1.start();
+    Thread.sleep(200);
+
+    AtomicBoolean removed = new AtomicBoolean(false);
+    Thread t2 = new Thread(() -> {
+      cache.remove(key);
+      removed.set(true);
+    });
+    t2.start();
+    Thread.sleep(200);
+    Assert.assertFalse(removed.get());
+
+    latch.countDown();
+    Thread.sleep(200);
+    Assert.assertTrue(removed.get());
+    Assert.assertFalse(error.get());
+  }
+}