You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/05/28 02:10:40 UTC
[hbase] branch HBASE-21879 updated: HBASE-22422 Retain an ByteBuff
with refCnt=0 when getBlock from LRUCache (#242)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-21879 by this push:
new b673000 HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)
b673000 is described below
commit b673000a5ca3387dd034657d7f4a24b60d3835d4
Author: openinx <op...@gmail.com>
AuthorDate: Tue May 28 10:10:34 2019 +0800
HBASE-22422 Retain an ByteBuff with refCnt=0 when getBlock from LRUCache (#242)
---
.../hadoop/hbase/io/hfile/BlockCacheUtil.java | 3 +
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 40 ++++---
.../hadoop/hbase/io/hfile/HFileBlockIndex.java | 26 ++---
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 21 ++--
.../hadoop/hbase/io/hfile/LruBlockCache.java | 23 ++--
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 29 +++--
.../hbase/io/hfile/TestCombinedBlockCache.java | 17 +++
.../hadoop/hbase/io/hfile/TestLruBlockCache.java | 83 +++++++++++++-
.../hadoop/hbase/io/hfile/bucket/TestRAMCache.java | 126 +++++++++++++++++++++
9 files changed, 306 insertions(+), 62 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 46e8e24..2672992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -230,6 +230,9 @@ public class BlockCacheUtil {
BlockCacheKey cacheKey, Cacheable newBlock) {
// NOTICE: The getBlock has retained the existingBlock inside.
Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
+ if (existingBlock == null) {
+ return true;
+ }
try {
int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
if (comparison < 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 846460f..079907e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@@ -677,18 +679,24 @@ public class HFileBlock implements Cacheable {
HFileBlock unpacked = new HFileBlock(this);
unpacked.allocateBuffer(); // allocates space for the decompressed block
-
- HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
- ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
-
- ByteBuff dup = this.buf.duplicate();
- dup.position(this.headerSize());
- dup = dup.slice();
-
- ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
- unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
-
- return unpacked;
+ boolean succ = false;
+ try {
+ HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA
+ ? reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext();
+ // Create a duplicated buffer without the header part.
+ ByteBuff dup = this.buf.duplicate();
+ dup.position(this.headerSize());
+ dup = dup.slice();
+ // Decode the dup into unpacked#buf
+ ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
+ unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(true), dup);
+ succ = true;
+ return unpacked;
+ } finally {
+ if (!succ) {
+ unpacked.release();
+ }
+ }
}
/**
@@ -709,7 +717,7 @@ public class HFileBlock implements Cacheable {
buf = newBuf;
// set limit to exclude next block's header
- buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
+ buf.limit(capacityNeeded);
}
/**
@@ -1685,7 +1693,7 @@ public class HFileBlock implements Cacheable {
}
private ByteBuff allocate(int size, boolean intoHeap) {
- return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
+ return intoHeap ? HEAP.allocate(size) : allocator.allocate(size);
}
/**
@@ -1735,7 +1743,7 @@ public class HFileBlock implements Cacheable {
if (LOG.isTraceEnabled()) {
LOG.trace("Extra see to get block size!", new RuntimeException());
}
- headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize));
+ headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind();
}
@@ -1778,7 +1786,7 @@ public class HFileBlock implements Cacheable {
// If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
// contains the header of next block, so no need to set next block's header in it.
HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE,
- offset, nextBlockOnDiskSize, fileContext, allocator);
+ offset, nextBlockOnDiskSize, fileContext, intoHeap ? HEAP: allocator);
// Run check on uncompressed sizings.
if (!fileContext.isCompressedOrEncrypted()) {
hFileBlock.sanityCheckUncompressed();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index ad61839..8396192 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -313,10 +313,13 @@ public class HFileBlockIndex {
int index = -1;
HFileBlock block = null;
- boolean dataBlock = false;
KeyOnlyKeyValue tmpNextIndexKV = new KeyValue.KeyOnlyKeyValue();
while (true) {
try {
+ // Must initialize it with null here, because if don't and once an exception happen in
+ // readBlock, then we'll release the previous assigned block twice in the finally block.
+ // (See HBASE-22422)
+ block = null;
if (currentBlock != null && currentBlock.getOffset() == currentOffset) {
// Avoid reading the same block again, even with caching turned off.
// This is crucial for compaction-type workload which might have
@@ -336,9 +339,8 @@ public class HFileBlockIndex {
// this also accounts for ENCODED_DATA
expectedBlockType = BlockType.DATA;
}
- block =
- cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache, pread,
- isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
+ block = cachingBlockReader.readBlock(currentOffset, currentOnDiskSize, shouldCache,
+ pread, isCompaction, true, expectedBlockType, expectedDataBlockEncoding);
}
if (block == null) {
@@ -348,7 +350,6 @@ public class HFileBlockIndex {
// Found a data block, break the loop and check our level in the tree.
if (block.getBlockType().isData()) {
- dataBlock = true;
break;
}
@@ -381,7 +382,7 @@ public class HFileBlockIndex {
nextIndexedKey = tmpNextIndexKV;
}
} finally {
- if (!dataBlock && block != null) {
+ if (block != null && !block.getBlockType().isData()) {
// Release the block immediately if it is not the data block
block.release();
}
@@ -389,7 +390,7 @@ public class HFileBlockIndex {
}
if (lookupLevel != searchTreeLevel) {
- assert dataBlock == true;
+ assert block.getBlockType().isData();
// Though we have retrieved a data block we have found an issue
// in the retrieved data block. Hence returned the block so that
// the ref count can be decremented
@@ -401,8 +402,7 @@ public class HFileBlockIndex {
}
// set the next indexed key for the current block.
- BlockWithScanInfo blockWithScanInfo = new BlockWithScanInfo(block, nextIndexedKey);
- return blockWithScanInfo;
+ return new BlockWithScanInfo(block, nextIndexedKey);
}
@Override
@@ -576,8 +576,7 @@ public class HFileBlockIndex {
boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
BlockWithScanInfo blockWithScanInfo = loadDataBlockWithScanInfo(key, currentBlock,
- cacheBlocks,
- pread, isCompaction, expectedDataBlockEncoding);
+ cacheBlocks, pread, isCompaction, expectedDataBlockEncoding);
if (blockWithScanInfo == null) {
return null;
} else {
@@ -600,9 +599,8 @@ public class HFileBlockIndex {
* @throws IOException
*/
public abstract BlockWithScanInfo loadDataBlockWithScanInfo(Cell key, HFileBlock currentBlock,
- boolean cacheBlocks,
- boolean pread, boolean isCompaction, DataBlockEncoding expectedDataBlockEncoding)
- throws IOException;
+ boolean cacheBlocks, boolean pread, boolean isCompaction,
+ DataBlockEncoding expectedDataBlockEncoding) throws IOException;
/**
* An approximation to the {@link HFile}'s mid-key. Operates on block
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 38631c0..701c8a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1128,15 +1128,13 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
updateCurrentBlock(newBlock);
}
- protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey,
- boolean rewind, Cell key, boolean seekBefore) throws IOException {
- if (this.curBlock == null
- || this.curBlock.getOffset() != seekToBlock.getOffset()) {
+ protected int loadBlockAndSeekToKey(HFileBlock seekToBlock, Cell nextIndexedKey, boolean rewind,
+ Cell key, boolean seekBefore) throws IOException {
+ if (this.curBlock == null || this.curBlock.getOffset() != seekToBlock.getOffset()) {
updateCurrentBlock(seekToBlock);
} else if (rewind) {
blockBuffer.rewind();
}
-
// Update the nextIndexedKey
this.nextIndexedKey = nextIndexedKey;
return blockSeek(key, seekBefore);
@@ -1473,9 +1471,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Validate encoding type for data blocks. We include encoding
// type in the cache key, and we expect it to match on a cache hit.
if (cachedBlock.getDataBlockEncoding() != dataBlockEncoder.getDataBlockEncoding()) {
+ // Remember to release the block when in exceptional path.
+ cachedBlock.release();
throw new IOException("Cached block under key " + cacheKey + " "
- + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
- + dataBlockEncoder.getDataBlockEncoding() + ")");
+ + "has wrong encoding: " + cachedBlock.getDataBlockEncoding() + " (expected: "
+ + dataBlockEncoder.getDataBlockEncoding() + ")");
}
}
// Cache-hit. Return!
@@ -1499,15 +1499,14 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
// Cache the block if necessary
- AtomicBoolean cachedRaw = new AtomicBoolean(false);
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
- cachedRaw.set(cacheConf.shouldCacheCompressed(category));
- cache.cacheBlock(cacheKey, cachedRaw.get() ? hfileBlock : unpacked,
+ cache.cacheBlock(cacheKey,
+ cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});
- if (unpacked != hfileBlock && !cachedRaw.get()) {
+ if (unpacked != hfileBlock) {
// End of life here if hfileBlock is an independent block.
hfileBlock.release();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 724685a..ec6db31 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -504,7 +504,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
- LruCachedBlock cb = map.get(cacheKey);
+ LruCachedBlock cb = map.computeIfPresent(cacheKey, (key, val) -> {
+ // It will be referenced by RPC path, so increase here. NOTICE: Must do the retain inside
+ // this block. because if retain outside the map#computeIfPresent, the evictBlock may remove
+ // the block and release, then we're retaining a block with refCnt=0 which is disallowed.
+ // see HBASE-22422.
+ val.getBuffer().retain();
+ return val;
+ });
if (cb == null) {
if (!repeat && updateCacheMetrics) {
stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
@@ -532,10 +539,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
}
return null;
}
- if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ if (updateCacheMetrics) {
+ stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ }
cb.access(count.incrementAndGet());
- // It will be referenced by RPC path, so increase here.
- cb.getBuffer().retain();
return cb.getBuffer();
}
@@ -592,8 +599,6 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
if (previous == null) {
return 0;
}
- // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate.
- previous.getBuffer().release();
updateSizeMetrics(block, true);
long val = elements.decrementAndGet();
if (LOG.isTraceEnabled()) {
@@ -601,7 +606,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
assertCounterSanity(size, val);
}
if (block.getBuffer().getBlockType().isData()) {
- dataBlockElements.decrement();
+ dataBlockElements.decrement();
}
if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to
@@ -611,6 +616,10 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
}
}
+ // Decrease the block's reference count, and if refCount is 0, then it'll auto-deallocate. DO
+ // NOT move this up because if do that then the victimHandler may access the buffer with
+ // refCnt = 0 which is disallowed.
+ previous.getBuffer().release();
return block.heapSize();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index a330565..6d990d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
@@ -1533,21 +1534,28 @@ public class BucketCache implements BlockCache, HeapSize {
}
public RAMQueueEntry get(BlockCacheKey key) {
- RAMQueueEntry re = delegate.get(key);
- if (re != null) {
- // It'll be referenced by RPC, so retain here.
+ return delegate.computeIfPresent(key, (k, re) -> {
+ // It'll be referenced by RPC, so retain atomically here. if the get and retain is not
+ // atomic, another thread may remove and release the block, when retaining in this thread we
+ // may retain a block with refCnt=0 which is disallowed. (see HBASE-22422)
re.getData().retain();
- }
- return re;
+ return re;
+ });
}
+ /**
+ * Return the previous associated value, or null if absent. It has the same meaning as
+ * {@link ConcurrentMap#putIfAbsent(Object, Object)}
+ */
public RAMQueueEntry putIfAbsent(BlockCacheKey key, RAMQueueEntry entry) {
- RAMQueueEntry previous = delegate.putIfAbsent(key, entry);
- if (previous == null) {
+ AtomicBoolean absent = new AtomicBoolean(false);
+ RAMQueueEntry re = delegate.computeIfAbsent(key, k -> {
// The RAMCache reference to this entry, so reference count should be increment.
entry.getData().retain();
- }
- return previous;
+ absent.set(true);
+ return entry;
+ });
+ return absent.get() ? null : re;
}
public boolean remove(BlockCacheKey key) {
@@ -1576,8 +1584,9 @@ public class BucketCache implements BlockCache, HeapSize {
public void clear() {
Iterator<Map.Entry<BlockCacheKey, RAMQueueEntry>> it = delegate.entrySet().iterator();
while (it.hasNext()) {
- it.next().getValue().getData().release();
+ RAMQueueEntry re = it.next().getValue();
it.remove();
+ re.getData().release();
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
index f4dc38a..a086a3b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCombinedBlockCache.java
@@ -17,11 +17,16 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static org.junit.Assert.assertEquals;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache.CombinedCacheStats;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -33,6 +38,8 @@ public class TestCombinedBlockCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCombinedBlockCache.class);
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
@Test
public void testCombinedCacheStats() {
CacheStats lruCacheStats = new CacheStats("lruCacheStats", 2);
@@ -102,4 +109,14 @@ public class TestCombinedBlockCache {
assertEquals(0.75, stats.getHitRatioPastNPeriods(), delta);
assertEquals(0.8, stats.getHitCachingRatioPastNPeriods(), delta);
}
+
+ @Test
+ public void testMultiThreadGetAndEvictBlock() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+ conf.setInt(BUCKET_CACHE_SIZE_KEY, 32);
+ BlockCache blockCache = BlockCacheFactory.createBlockCache(conf);
+ Assert.assertTrue(blockCache instanceof CombinedBlockCache);
+ TestLruBlockCache.testMultiThreadGetAndEvictBlockInternal(blockCache);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
index 3317a4d..a355ab0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -27,6 +28,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -34,15 +36,17 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.EvictionThread;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the concurrent LruBlockCache.<p>
@@ -58,6 +62,8 @@ public class TestLruBlockCache {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLruBlockCache.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestLruBlockCache.class);
+
@Test
public void testCacheEvictionThreadSafe() throws Exception {
long maxSize = 100000;
@@ -814,11 +820,10 @@ public class TestLruBlockCache {
byte[] byteArr = new byte[length];
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
- ByteBuffAllocator alloc = ByteBuffAllocator.HEAP;
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
+ HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
+ HFileBlock.FILL_HEADER, -1, -1, -1, meta, HEAP);
LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false,
(int)Math.ceil(1.2*maxSize/blockSize),
@@ -958,5 +963,75 @@ public class TestLruBlockCache {
}
+ static void testMultiThreadGetAndEvictBlockInternal(BlockCache cache) throws Exception {
+ int size = 100;
+ int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+ byte[] byteArr = new byte[length];
+ HFileContext meta = new HFileContextBuilder().build();
+ BlockCacheKey key = new BlockCacheKey("key1", 0);
+ HFileBlock blk = new HFileBlock(BlockType.DATA, size, size, -1,
+ ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1, meta, HEAP);
+ AtomicBoolean err1 = new AtomicBoolean(false);
+ Thread t1 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err1.get(); i++) {
+ try {
+ cache.getBlock(key, false, false, true);
+ } catch (Exception e) {
+ err1.set(true);
+ LOG.info("Cache block or get block failure: ", e);
+ }
+ }
+ });
+
+ AtomicBoolean err2 = new AtomicBoolean(false);
+ Thread t2 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err2.get(); i++) {
+ try {
+ cache.evictBlock(key);
+ } catch (Exception e) {
+ err2.set(true);
+ LOG.info("Evict block failure: ", e);
+ }
+ }
+ });
+
+ AtomicBoolean err3 = new AtomicBoolean(false);
+ Thread t3 = new Thread(() -> {
+ for (int i = 0; i < 10000 && !err3.get(); i++) {
+ try {
+ cache.cacheBlock(key, blk);
+ } catch (Exception e) {
+ err3.set(true);
+ LOG.info("Cache block failure: ", e);
+ }
+ }
+ });
+ t1.start();
+ t2.start();
+ t3.start();
+ t1.join();
+ t2.join();
+ t3.join();
+ Assert.assertFalse(err1.get());
+ Assert.assertFalse(err2.get());
+ Assert.assertFalse(err3.get());
+ }
+
+ @Test
+ public void testMultiThreadGetAndEvictBlock() throws Exception {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+ LruBlockCache cache =
+ new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize),
+ LruBlockCache.DEFAULT_LOAD_FACTOR, LruBlockCache.DEFAULT_CONCURRENCY_LEVEL,
+ 0.66f, // min
+ 0.99f, // acceptable
+ 0.33f, // single
+ 0.33f, // multi
+ 0.34f, // memory
+ 1.2f, // limit
+ false, 1024);
+ testMultiThreadGetAndEvictBlockInternal(cache);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
new file mode 100644
index 0000000..5c5dda6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestRAMCache {
+ private static final Logger LOG = LoggerFactory.getLogger(TestRAMCache.class);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRAMCache.class);
+
+ // Define a mock HFileBlock.
+ private static class MockHFileBlock extends HFileBlock {
+
+ private volatile CountDownLatch latch;
+
+ MockHFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
+ int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader,
+ long offset, int nextBlockOnDiskSize, int onDiskDataSizeWithHeader,
+ HFileContext fileContext, ByteBuffAllocator allocator) {
+ super(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, b,
+ fillHeader, offset, nextBlockOnDiskSize, onDiskDataSizeWithHeader, fileContext,
+ allocator);
+ }
+
+ public void setLatch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public MockHFileBlock retain() {
+ try {
+ if (latch != null) {
+ latch.await();
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted exception error: ", e);
+ }
+ super.retain();
+ return this;
+ }
+ }
+
+ @Test
+ public void testAtomicRAMCache() throws Exception {
+ int size = 100;
+ int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
+ byte[] byteArr = new byte[length];
+
+ RAMCache cache = new RAMCache();
+ BlockCacheKey key = new BlockCacheKey("file-1", 1);
+ MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
+ ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
+ new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
+ RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, ByteBuffAllocator.NONE);
+
+ Assert.assertNull(cache.putIfAbsent(key, re));
+ Assert.assertEquals(cache.putIfAbsent(key, re), re);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ blk.setLatch(latch);
+
+ AtomicBoolean error = new AtomicBoolean(false);
+ Thread t1 = new Thread(() -> {
+ try {
+ cache.get(key);
+ } catch (Exception e) {
+ error.set(true);
+ }
+ });
+ t1.start();
+ Thread.sleep(200);
+
+ AtomicBoolean removed = new AtomicBoolean(false);
+ Thread t2 = new Thread(() -> {
+ cache.remove(key);
+ removed.set(true);
+ });
+ t2.start();
+ Thread.sleep(200);
+ Assert.assertFalse(removed.get());
+
+ latch.countDown();
+ Thread.sleep(200);
+ Assert.assertTrue(removed.get());
+ Assert.assertFalse(error.get());
+ }
+}