You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/18 12:32:08 UTC
[hbase] 06/22: HBASE-22127 Ensure that the block cached in the
LRUBlockCache offheap is allocated from heap
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 982cb7f804c47681ecdfdfd741bcb4354d8dfae9
Author: huzheng <op...@gmail.com>
AuthorDate: Mon Apr 1 22:23:24 2019 +0800
HBASE-22127 Ensure that the block cached in the LRUBlockCache offheap is allocated from heap
---
.../apache/hadoop/hbase/io/ByteBuffAllocator.java | 20 ++-
.../apache/hadoop/hbase/io/hfile/CacheConfig.java | 4 +
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 86 +++++++-----
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 49 ++++---
.../hadoop/hbase/io/hfile/LruBlockCache.java | 32 ++++-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 76 ++++++++---
.../apache/hadoop/hbase/io/hfile/TestChecksum.java | 24 ++--
.../apache/hadoop/hbase/io/hfile/TestHFile.java | 124 ++++++++++++++++--
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 145 +++++++++++++++------
.../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 2 +-
.../hadoop/hbase/io/hfile/TestHFileEncryption.java | 2 +-
.../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 8 +-
.../io/hfile/TestLazyDataBlockDecompression.java | 2 +-
.../io/hfile/bucket/TestBucketWriterThread.java | 6 +-
.../hadoop/hbase/master/AbstractTestDLS.java | 29 ++---
15 files changed, 453 insertions(+), 156 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 0020e23..984d46d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import sun.nio.ch.DirectBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@@ -34,7 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -191,7 +191,7 @@ public class ByteBuffAllocator {
}
// If disabled the reservoir, just allocate it from on-heap.
if (!isReservoirEnabled() || size == 0) {
- return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
+ return allocateOnHeap(size);
}
int reminder = size % bufSize;
int len = size / bufSize + (reminder > 0 ? 1 : 0);
@@ -222,6 +222,22 @@ public class ByteBuffAllocator {
return bb;
}
+ /**
+ * Free all direct buffers if allocated, mainly used for testing.
+ */
+ @VisibleForTesting
+ public void clean() {
+ while (!buffers.isEmpty()) {
+ ByteBuffer b = buffers.poll();
+ if (b instanceof DirectBuffer) {
+ DirectBuffer db = (DirectBuffer) b;
+ if (db.cleaner() != null) {
+ db.cleaner().clean();
+ }
+ }
+ }
+ }
+
public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
if (buffers == null || buffers.length == 0) {
throw new IllegalArgumentException("buffers shouldn't be null or empty");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 53c216f..bb57fbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -367,6 +367,10 @@ public class CacheConfig {
return Optional.ofNullable(this.blockCache);
}
+ public boolean isCombinedBlockCache() {
+ return blockCache instanceof CombinedBlockCache;
+ }
+
public ByteBuffAllocator getByteBuffAllocator() {
return this.byteBuffAllocator;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 22a8295..2fe9255 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -763,6 +763,13 @@ public class HFileBlock implements Cacheable {
}
/**
+ * @return true to indicate the block is allocated from JVM heap, otherwise from off-heap.
+ */
+ boolean isOnHeap() {
+ return buf.hasArray();
+ }
+
+ /**
* Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows:
* <ol>
@@ -1300,16 +1307,29 @@ public class HFileBlock implements Cacheable {
/** An HFile block reader with iteration ability. */
interface FSReader {
/**
- * Reads the block at the given offset in the file with the given on-disk
- * size and uncompressed size.
- *
- * @param offset
- * @param onDiskSize the on-disk size of the entire block, including all
- * applicable headers, or -1 if unknown
+ * Reads the block at the given offset in the file with the given on-disk size and uncompressed
+ * size.
+ * @param offset of the file to read
+ * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or
+ * -1 if unknown
+ * @param pread true to use pread, otherwise use the stream read.
+ * @param updateMetrics update the metrics or not.
+ * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For
+ * LRUBlockCache, we must ensure that the block to cache is an heap one, because the
+ * memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use
+ * the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or
+ * MetaBlock for faster access. So introduce an flag here to decide whether allocate
+ * from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when
+ * using LRUBlockCache. For most cases, we known what's the expected block type we'll
+ * read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we
+ * cannot pre-decide what's the expected block type, then we can only allocate block's
+ * ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in
+ * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not
+ * then we'll clone it to an heap one and cache it.
* @return the newly read block
*/
- HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics)
- throws IOException;
+ HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics,
+ boolean intoHeap) throws IOException;
/**
* Creates a block iterator over the given portion of the {@link HFile}.
@@ -1444,7 +1464,7 @@ public class HFileBlock implements Cacheable {
if (offset >= endOffset) {
return null;
}
- HFileBlock b = readBlockData(offset, length, false, false);
+ HFileBlock b = readBlockData(offset, length, false, false, true);
offset += b.getOnDiskSizeWithHeader();
length = b.getNextBlockOnDiskSize();
HFileBlock uncompressed = b.unpack(fileContext, owner);
@@ -1526,16 +1546,18 @@ public class HFileBlock implements Cacheable {
/**
* Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as
* little memory allocation as possible, using the provided on-disk size.
- *
* @param offset the offset in the stream to read at
- * @param onDiskSizeWithHeaderL the on-disk size of the block, including
- * the header, or -1 if unknown; i.e. when iterating over blocks reading
- * in the file metadata info.
+ * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if
+ * unknown; i.e. when iterating over blocks reading in the file metadata info.
* @param pread whether to use a positional read
+ * @param updateMetrics whether to update the metrics
+ * @param intoHeap allocate ByteBuff of block from heap or off-heap.
+ * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the
+ * useHeap.
*/
@Override
public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread,
- boolean updateMetrics) throws IOException {
+ boolean updateMetrics, boolean intoHeap) throws IOException {
// Get a copy of the current state of whether to validate
// hbase checksums or not for this read call. This is not
// thread-safe but the one constaint is that if we decide
@@ -1544,9 +1566,8 @@ public class HFileBlock implements Cacheable {
boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum();
FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum);
- HFileBlock blk = readBlockDataInternal(is, offset,
- onDiskSizeWithHeaderL, pread,
- doVerificationThruHBaseChecksum, updateMetrics);
+ HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
+ doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk == null) {
HFile.LOG.warn("HBase checksum verification failed for file " +
pathName + " at offset " +
@@ -1573,7 +1594,7 @@ public class HFileBlock implements Cacheable {
is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
doVerificationThruHBaseChecksum = false;
blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
- doVerificationThruHBaseChecksum, updateMetrics);
+ doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
if (blk != null) {
HFile.LOG.warn("HDFS checksum verification succeeded for file " +
pathName + " at offset " +
@@ -1669,24 +1690,29 @@ public class HFileBlock implements Cacheable {
return nextBlockOnDiskSize;
}
+ private ByteBuff allocate(int size, boolean intoHeap) {
+ return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size);
+ }
+
/**
* Reads a version 2 block.
- *
* @param offset the offset in the stream to read at.
- * @param onDiskSizeWithHeaderL the on-disk size of the block, including
- * the header and checksums if present or -1 if unknown (as a long). Can be -1
- * if we are doing raw iteration of blocks as when loading up file metadata; i.e.
- * the first read of a new file. Usually non-null gotten from the file index.
+ * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and
+ * checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw
+ * iteration of blocks as when loading up file metadata; i.e. the first read of a new
+ * file. Usually non-null gotten from the file index.
* @param pread whether to use a positional read
- * @param verifyChecksum Whether to use HBase checksums.
- * If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off
- * reading same file if we hit a troublesome patch in an hfile.
+ * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then
+ * use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome
+ * patch in an hfile.
+ * @param updateMetrics whether need to update the metrics.
+ * @param intoHeap allocate the ByteBuff of block from heap or off-heap.
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
@VisibleForTesting
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
- long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
- throws IOException {
+ long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
+ boolean intoHeap) throws IOException {
if (offset < 0) {
throw new IOException("Invalid offset=" + offset + " trying to read "
+ "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
@@ -1728,7 +1754,7 @@ public class HFileBlock implements Cacheable {
// says where to start reading. If we have the header cached, then we don't need to read
// it again and we can likely read from last place we left off w/o need to backup and reread
// the header we read last time through here.
- ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + hdrSize);
+ ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap);
boolean initHFileBlockSuccess = false;
try {
if (headerBuf != null) {
@@ -2072,7 +2098,7 @@ public class HFileBlock implements Cacheable {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
}
- public HFileBlock deepClone() {
+ public HFileBlock deepCloneOnHeap() {
return new HFileBlock(this, true);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 5fdb66f..1137961 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -272,8 +272,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end));
}
- // TODO: Could we use block iterator in here? Would that get stuff into the cache?
- HFileBlock prevBlock = null;
+ // Don't use BlockIterator here, because it's designed to read load-on-open section.
+ long onDiskSizeOfNextBlock = -1;
while (offset < end) {
if (Thread.interrupted()) {
break;
@@ -282,16 +282,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// the internal-to-hfileblock thread local which holds the overread that gets the
// next header, will not have happened...so, pass in the onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely I'd say.
- long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1;
- HFileBlock block = readBlock(offset, onDiskSize, /*cacheBlock=*/true,
- /*pread=*/true, false, false, null, null);
- // Need not update the current block. Ideally here the readBlock won't find the
- // block in cache. We call this readBlock so that block data is read from FS and
- // cached in BC. So there is no reference count increment that happens here.
- // The return will ideally be a noop because the block is not of MemoryType SHARED.
- returnBlock(block);
- prevBlock = block;
- offset += block.getOnDiskSizeWithHeader();
+ HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true,
+ /* pread= */true, false, false, null, null);
+ try {
+ onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
+ offset += block.getOnDiskSizeWithHeader();
+ } finally {
+ // Ideally here the readBlock won't find the block in cache. We call this
+ // readBlock so that block data is read from FS and cached in BC. we must call
+ // returnBlock here to decrease the reference count of block.
+ returnBlock(block);
+ }
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation, etc.)
@@ -1419,7 +1420,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache Miss, please load.
HFileBlock compressedBlock =
- fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false);
+ fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false, true);
HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
if (compressedBlock != uncompressedBlock) {
compressedBlock.release();
@@ -1434,6 +1435,24 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
}
}
+ /**
+ * If expected block is data block, we'll allocate the ByteBuff of block from
+ * {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} and it's usually an off-heap one,
+ * otherwise it will allocate from heap.
+ * @see org.apache.hadoop.hbase.io.hfile.HFileBlock.FSReader#readBlockData(long, long, boolean,
+ * boolean, boolean)
+ */
+ private boolean shouldUseHeap(BlockType expectedBlockType) {
+ if (cacheConf.getBlockCache() == null) {
+ return false;
+ } else if (!cacheConf.isCombinedBlockCache()) {
+ // Block to cache in LruBlockCache must be an heap one. So just allocate block memory from
+ // heap for saving an extra off-heap to heap copying.
+ return true;
+ }
+ return expectedBlockType != null && !expectedBlockType.isData();
+ }
+
@Override
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock, boolean pread, final boolean isCompaction,
@@ -1505,8 +1524,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
TraceUtil.addTimelineAnnotation("blockCacheMiss");
// Load block from filesystem.
- HFileBlock hfileBlock =
- fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction);
+ HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread,
+ !isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index c2f07cd..b01d014 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -354,6 +354,32 @@ public class LruBlockCache implements FirstLevelBlockCache {
}
}
+ /**
+ * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap
+ * access will be more faster then off-heap, the small index block or meta block cached in
+ * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always
+ * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the
+ * heap size will be messed up. Here we will clone the block into an heap block if it's an
+ * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of
+ * the block (HBASE-22127): <br>
+ * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br>
+ * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's
+ * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by
+ * JVM, so need a retain here.
+ * @param buf the original block
+ * @return an block with an heap memory backend.
+ */
+ private Cacheable asReferencedHeapBlock(Cacheable buf) {
+ if (buf instanceof HFileBlock) {
+ HFileBlock blk = ((HFileBlock) buf);
+ if (!blk.isOnHeap()) {
+ return blk.deepCloneOnHeap();
+ }
+ }
+ // The block will be referenced by this LRUBlockCache, so should increase its refCnt here.
+ return buf.retain();
+ }
+
// BlockCache implementation
/**
@@ -402,8 +428,8 @@ public class LruBlockCache implements FirstLevelBlockCache {
}
return;
}
- // The block will be referenced by the LRUBlockCache, so should increase the refCnt here.
- buf.retain();
+ // Ensure that the block is an heap one.
+ buf = asReferencedHeapBlock(buf);
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
@@ -503,7 +529,7 @@ public class LruBlockCache implements FirstLevelBlockCache {
if (caching) {
if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) {
Cacheable original = result;
- result = ((HFileBlock) original).deepClone();
+ result = ((HFileBlock) original).deepCloneOnHeap();
// deepClone an new one, so need to put the original one back to free it.
victimHandler.returnBlock(cacheKey, original);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 0f3446e..91f3986 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -557,23 +559,54 @@ public class BucketCache implements BlockCache, HeapSize {
return evictBlock(cacheKey, true);
}
- private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
- RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
- if (removedBlock != null) {
- this.blockNumber.decrement();
- this.heapSize.add(-1 * removedBlock.getData().heapSize());
+ // does not check for the ref count. Just tries to evict it if found in the
+ // bucket map
+ private boolean forceEvict(BlockCacheKey cacheKey) {
+ if (!cacheEnabled) {
+ return false;
}
- return removedBlock;
+ boolean existed = removeFromRamCache(cacheKey);
+ BucketEntry bucketEntry = backingMap.get(cacheKey);
+ if (bucketEntry == null) {
+ if (existed) {
+ cacheStats.evicted(0, cacheKey.isPrimary());
+ return true;
+ } else {
+ return false;
+ }
+ }
+ ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
+ try {
+ lock.writeLock().lock();
+ if (backingMap.remove(cacheKey, bucketEntry)) {
+ blockEvicted(cacheKey, bucketEntry, !existed);
+ } else {
+ return false;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
+ return true;
+ }
+
+ private boolean removeFromRamCache(BlockCacheKey cacheKey) {
+ return ramCache.remove(cacheKey, re -> {
+ if (re != null) {
+ this.blockNumber.decrement();
+ this.heapSize.add(-1 * re.getData().heapSize());
+ }
+ });
}
public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
if (!cacheEnabled) {
return false;
}
- RAMQueueEntry removedBlock = checkRamCache(cacheKey);
+ boolean existed = removeFromRamCache(cacheKey);
BucketEntry bucketEntry = backingMap.get(cacheKey);
if (bucketEntry == null) {
- if (removedBlock != null) {
+ if (existed) {
cacheStats.evicted(0, cacheKey.isPrimary());
return true;
} else {
@@ -586,7 +619,7 @@ public class BucketCache implements BlockCache, HeapSize {
int refCount = bucketEntry.getRefCount();
if (refCount == 0) {
if (backingMap.remove(cacheKey, bucketEntry)) {
- blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+ blockEvicted(cacheKey, bucketEntry, !existed);
} else {
return false;
}
@@ -1009,10 +1042,12 @@ public class BucketCache implements BlockCache, HeapSize {
putIntoBackingMap(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
- RAMQueueEntry ramCacheEntry = ramCache.remove(key);
- if (ramCacheEntry != null) {
- heapSize.add(-1 * entries.get(i).getData().heapSize());
- } else if (bucketEntries[i] != null){
+ boolean existed = ramCache.remove(key, re -> {
+ if (re != null) {
+ heapSize.add(-1 * re.getData().heapSize());
+ }
+ });
+ if (!existed && bucketEntries[i] != null) {
// Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
try {
@@ -1737,12 +1772,23 @@ public class BucketCache implements BlockCache, HeapSize {
return previous;
}
- public RAMQueueEntry remove(BlockCacheKey key) {
+ public boolean remove(BlockCacheKey key) {
+ return remove(key, re->{});
+ }
+
+ /**
+ * Defined an {@link Consumer} here, because once the removed entry release its reference count,
+ * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an
+ * exception. the consumer will access entry to remove before release its reference count.
+ * Notice, don't change its reference count in the {@link Consumer}
+ */
+ public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
RAMQueueEntry previous = delegate.remove(key);
+ action.accept(previous);
if (previous != null) {
previous.getData().release();
}
- return previous;
+ return previous != null;
}
public boolean isEmpty() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index c432fa9..2aebc8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -100,7 +100,8 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
- HFileBlock b = hbr.readBlockData(0, -1, false, false);
+ HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
+ assertTrue(b.isOnHeap());
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
@@ -146,7 +147,8 @@ public class TestChecksum {
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path,
meta, ByteBuffAllocator.HEAP);
- HFileBlock b = hbr.readBlockData(0, -1, false, false);
+ HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
+ assertTrue(b.isOnHeap());
// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());
@@ -215,7 +217,7 @@ public class TestChecksum {
.withHBaseCheckSum(true)
.build();
HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta);
- HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+ HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
@@ -236,19 +238,19 @@ public class TestChecksum {
// requests. Verify that this is correct.
for (int i = 0; i <
HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
- b = hbr.readBlockData(0, -1, pread, false);
+ b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
// The next read should have hbase checksum verification reanabled,
// we verify this by assertng that there was a hbase-checksum failure.
- b = hbr.readBlockData(0, -1, pread, false);
+ b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(1, HFile.getAndResetChecksumFailuresCount());
// Since the above encountered a checksum failure, we switch
// back to not checking hbase checksums.
- b = hbr.readBlockData(0, -1, pread, false);
+ b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
is.close();
@@ -260,7 +262,7 @@ public class TestChecksum {
assertEquals(false, newfs.useHBaseChecksum());
is = new FSDataInputStreamWrapper(newfs, path);
hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
- b = hbr.readBlockData(0, -1, pread, false);
+ b = hbr.readBlockData(0, -1, pread, false, true);
is.close();
b.sanityCheck();
b = b.unpack(meta, hbr);
@@ -343,7 +345,7 @@ public class TestChecksum {
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize,
hfs, path, meta, ByteBuffAllocator.HEAP);
- HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+ HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
is.close();
b.sanityCheck();
@@ -389,13 +391,13 @@ public class TestChecksum {
@Override
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
- long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics)
- throws IOException {
+ long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics,
+ boolean useHeap) throws IOException {
if (verifyChecksum) {
corruptDataStream = true;
}
HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
- verifyChecksum, updateMetrics);
+ verifyChecksum, updateMetrics, useHeap);
corruptDataStream = false;
return b;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index f58fe3e..0ed933b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
+import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -104,33 +108,129 @@ public class TestHFile {
fs = TEST_UTIL.getTestFileSystem();
}
- @Test
- public void testReaderWithoutBlockCache() throws Exception {
- int bufCount = 32;
+ private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount,
+ int minAllocSize) {
Configuration that = HBaseConfiguration.create(conf);
+ that.setInt(BUFFER_SIZE_KEY, bufSize);
that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
- // AllByteBuffers will be allocated from the buffers.
- that.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
- ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true);
- List<ByteBuff> buffs = new ArrayList<>();
+ // All ByteBuffers will be allocated from the buffers.
+ that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize);
+ return ByteBuffAllocator.create(that, reservoirEnabled);
+ }
+
+ private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) {
// Fill the allocator with bufCount ByteBuffer
+ List<ByteBuff> buffs = new ArrayList<>();
for (int i = 0; i < bufCount; i++) {
buffs.add(alloc.allocateOneBuffer());
+ Assert.assertEquals(alloc.getQueueSize(), 0);
}
- Assert.assertEquals(alloc.getQueueSize(), 0);
- for (ByteBuff buf : buffs) {
- buf.release();
- }
+ buffs.forEach(ByteBuff::release);
Assert.assertEquals(alloc.getQueueSize(), bufCount);
+ }
+
+ @Test
+ public void testReaderWithoutBlockCache() throws Exception {
+ int bufCount = 32;
+ // AllByteBuffers will be allocated from the buffers.
+ ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0);
+ fillByteBuffAllocator(alloc, bufCount);
// start write to store file.
Path path = writeStoreFile();
try {
- readStoreFile(path, that, alloc);
+ readStoreFile(path, conf, alloc);
} catch (Exception e) {
// fail test
assertTrue(false);
}
Assert.assertEquals(bufCount, alloc.getQueueSize());
+ alloc.clean();
+ }
+
+ /**
+ * Test case for HBASE-22127 in LruBlockCache.
+ */
+ @Test
+ public void testReaderWithLRUBlockCache() throws Exception {
+ int bufCount = 1024, blockSize = 64 * 1024;
+ ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
+ fillByteBuffAllocator(alloc, bufCount);
+ Path storeFilePath = writeStoreFile();
+ // Open the file reader with LRUBlockCache
+ BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf);
+ CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc);
+ HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
+ long offset = 0;
+ while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+ BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
+ HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
+ offset += block.getOnDiskSizeWithHeader();
+ // Ensure the block is an heap one.
+ Cacheable cachedBlock = lru.getBlock(key, false, false, true);
+ Assert.assertNotNull(cachedBlock);
+ Assert.assertTrue(cachedBlock instanceof HFileBlock);
+ Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
+ // Should never allocate off-heap block from allocator because ensure that it's LRU.
+ Assert.assertEquals(bufCount, alloc.getQueueSize());
+ block.release(); // return back the ByteBuffer back to allocator.
+ }
+ reader.close();
+ Assert.assertEquals(bufCount, alloc.getQueueSize());
+ alloc.clean();
+ lru.shutdown();
+ }
+
+ private BlockCache initCombinedBlockCache() {
+ Configuration that = HBaseConfiguration.create(conf);
+ that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
+ that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+ BlockCache bc = BlockCacheFactory.createBlockCache(that);
+ Assert.assertNotNull(bc);
+ Assert.assertTrue(bc instanceof CombinedBlockCache);
+ return bc;
+ }
+
+ /**
+ * Test case for HBASE-22127 in CombinedBlockCache
+ */
+ @Test
+ public void testReaderWithCombinedBlockCache() throws Exception {
+ int bufCount = 1024, blockSize = 64 * 1024;
+ ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
+ fillByteBuffAllocator(alloc, bufCount);
+ Path storeFilePath = writeStoreFile();
+ // Open the file reader with CombinedBlockCache
+ BlockCache combined = initCombinedBlockCache();
+ conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
+ CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
+ HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf);
+ long offset = 0;
+ while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+ BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
+ HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null);
+ offset += block.getOnDiskSizeWithHeader();
+ // Read the cached block.
+ Cacheable cachedBlock = combined.getBlock(key, false, false, true);
+ try {
+ Assert.assertNotNull(cachedBlock);
+ Assert.assertTrue(cachedBlock instanceof HFileBlock);
+ HFileBlock hfb = (HFileBlock) cachedBlock;
+ // Data block will be cached in BucketCache, so it should be an off-heap block.
+ if (hfb.getBlockType().isData()) {
+ Assert.assertFalse(hfb.isOnHeap());
+ } else {
+ // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block.
+ Assert.assertTrue(hfb.isOnHeap());
+ }
+ } finally {
+ combined.returnBlock(key, cachedBlock);
+ }
+ block.release(); // return back the ByteBuffer back to allocator.
+ }
+ reader.close();
+ combined.shutdown();
+ Assert.assertEquals(bufCount, alloc.getQueueSize());
+ alloc.clean();
}
private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index efdae16..2733ca2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -40,6 +40,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -68,6 +71,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.Compressor;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -93,10 +97,12 @@ public class TestHFileBlock {
private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class);
- static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
+ // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet.
+ static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ };
private static final int NUM_TEST_BLOCKS = 1000;
private static final int NUM_READER_THREADS = 26;
+ private static final int MAX_BUFFER_COUNT = 2048;
// Used to generate KeyValues
private static int NUM_KEYVALUES = 50;
@@ -108,14 +114,51 @@ public class TestHFileBlock {
private final boolean includesMemstoreTS;
private final boolean includesTag;
- public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
+ private final boolean useHeapAllocator;
+ private final ByteBuffAllocator alloc;
+
+ public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) {
this.includesMemstoreTS = includesMemstoreTS;
this.includesTag = includesTag;
+ this.useHeapAllocator = useHeapAllocator;
+ this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : createOffHeapAlloc();
+ assertAllocator();
}
@Parameters
public static Collection<Object[]> parameters() {
- return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
+ List<Object[]> params = new ArrayList<>();
+ // Generate boolean triples from 000 to 111
+ for (int i = 0; i < (1 << 3); i++) {
+ Object[] flags = new Boolean[3];
+ for (int k = 0; k < 3; k++) {
+ flags[k] = (i & (1 << k)) != 0;
+ }
+ params.add(flags);
+ }
+ return params;
+ }
+
+ private ByteBuffAllocator createOffHeapAlloc() {
+ Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
+ conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+ ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
+ // Fill the allocator
+ List<ByteBuff> bufs = new ArrayList<>();
+ for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
+ ByteBuff bb = alloc.allocateOneBuffer();
+ assertTrue(!bb.hasArray());
+ bufs.add(bb);
+ }
+ bufs.forEach(ByteBuff::release);
+ return alloc;
+ }
+
+ private void assertAllocator() {
+ if (!useHeapAllocator) {
+ assertEquals(MAX_BUFFER_COUNT, alloc.getQueueSize());
+ }
}
@Before
@@ -123,6 +166,12 @@ public class TestHFileBlock {
fs = HFileSystem.get(TEST_UTIL.getConfiguration());
}
+ @After
+ public void tearDown() throws IOException {
+ assertAllocator();
+ alloc.clean();
+ }
+
static void writeTestBlockContents(DataOutputStream dos) throws IOException {
// This compresses really well.
for (int i = 0; i < 1000; ++i)
@@ -327,9 +376,8 @@ public class TestHFileBlock {
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(algo).build();
- HFileBlock.FSReader hbr =
- new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
- HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+ HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+ HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
is.close();
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
@@ -341,14 +389,14 @@ public class TestHFileBlock {
if (algo == GZ) {
is = fs.open(path);
- hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
- b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
- b.totalChecksumBytes(), pread, false);
+ hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+ b = hbr.readBlockData(0,
+ 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
- b = hbr.readBlockData(0, wrongCompressedSize
- + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
+ hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
+ false, true);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
@@ -356,8 +404,10 @@ public class TestHFileBlock {
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
+ assertTrue(b.release());
is.close();
}
+ assertTrue(expected.release());
}
}
}
@@ -428,13 +478,13 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.build();
HFileBlock.FSReaderImpl hbr =
- new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
+ new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
hbr.setDataBlockEncoder(dataBlockEncoder);
hbr.setIncludesMemStoreTS(includesMemstoreTS);
HFileBlock blockFromHFile, blockUnpacked;
int pos = 0;
for (int blockId = 0; blockId < numBlocks; ++blockId) {
- blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
+ blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
blockFromHFile.sanityCheck();
pos += blockFromHFile.getOnDiskSizeWithHeader();
@@ -487,6 +537,10 @@ public class TestHFileBlock {
blockUnpacked, deserialized.unpack(meta, hbr));
}
}
+ assertTrue(blockUnpacked.release());
+ if (blockFromHFile != blockUnpacked) {
+ blockFromHFile.release();
+ }
}
is.close();
}
@@ -557,7 +611,7 @@ public class TestHFileBlock {
.withIncludesTags(includesTag)
.withCompression(algo).build();
HFileBlock.FSReader hbr =
- new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP);
+ new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
long curOffset = 0;
for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
if (!pread) {
@@ -569,7 +623,7 @@ public class TestHFileBlock {
if (detailedLogging) {
LOG.info("Reading block #" + i + " at offset " + curOffset);
}
- HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
+ HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true);
if (detailedLogging) {
LOG.info("Block #" + i + ": " + b);
}
@@ -583,7 +637,8 @@ public class TestHFileBlock {
// Now re-load this block knowing the on-disk size. This tests a
// different branch in the loader.
- HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false);
+ HFileBlock b2 =
+ hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true);
b2.sanityCheck();
assertEquals(b.getBlockType(), b2.getBlockType());
@@ -599,6 +654,7 @@ public class TestHFileBlock {
assertEquals(b.getOnDiskDataSizeWithHeader(),
b2.getOnDiskDataSizeWithHeader());
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
+ assertTrue(b2.release());
curOffset += b.getOnDiskSizeWithHeader();
@@ -606,14 +662,14 @@ public class TestHFileBlock {
// NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply
// verifies that the unpacked value read back off disk matches the unpacked value
// generated before writing to disk.
- b = b.unpack(meta, hbr);
+ HFileBlock newBlock = b.unpack(meta, hbr);
// b's buffer has header + data + checksum while
// expectedContents have header + data only
- ByteBuff bufRead = b.getBufferReadOnly();
+ ByteBuff bufRead = newBlock.getBufferReadOnly();
ByteBuffer bufExpected = expectedContents.get(i);
boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
bufRead.arrayOffset(),
- bufRead.limit() - b.totalChecksumBytes(),
+ bufRead.limit() - newBlock.totalChecksumBytes(),
bufExpected.array(), bufExpected.arrayOffset(),
bufExpected.limit()) == 0;
String wrongBytesMsg = "";
@@ -642,9 +698,12 @@ public class TestHFileBlock {
}
}
assertTrue(wrongBytesMsg, bytesAreCorrect);
+ assertTrue(newBlock.release());
+ if (newBlock != b) {
+ assertTrue(b.release());
+ }
}
}
-
assertEquals(curOffset, fs.getFileStatus(path).getLen());
is.close();
}
@@ -687,29 +746,37 @@ public class TestHFileBlock {
boolean pread = true;
boolean withOnDiskSize = rand.nextBoolean();
long expectedSize =
- (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
- : offsets.get(blockId + 1)) - offset;
-
- HFileBlock b;
+ (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset;
+ HFileBlock b = null;
try {
long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
- b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
+ b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
+ if (useHeapAllocator) {
+ assertTrue(b.isOnHeap());
+ } else {
+ assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
+ }
+ assertEquals(types.get(blockId), b.getBlockType());
+ assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
+ assertEquals(offset, b.getOffset());
} catch (IOException ex) {
- LOG.error("Error in client " + clientId + " trying to read block at "
- + offset + ", pread=" + pread + ", withOnDiskSize=" +
- withOnDiskSize, ex);
+ LOG.error("Error in client " + clientId + " trying to read block at " + offset
+ + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize,
+ ex);
return false;
+ } finally {
+ if (b != null) {
+ b.release();
+ }
}
-
- assertEquals(types.get(blockId), b.getBlockType());
- assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
- assertEquals(offset, b.getOffset());
-
++numBlocksRead;
- if (pread)
+ if (pread) {
++numPositionalRead;
- if (withOnDiskSize)
+ }
+
+ if (withOnDiskSize) {
++numWithOnDiskSize;
+ }
}
LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
" blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
@@ -717,7 +784,6 @@ public class TestHFileBlock {
return true;
}
-
}
@Test
@@ -742,7 +808,7 @@ public class TestHFileBlock {
.withCompression(compressAlgo)
.build();
HFileBlock.FSReader hbr =
- new HFileBlock.FSReaderImpl(is, fileSize, meta, ByteBuffAllocator.HEAP);
+ new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc);
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
@@ -761,7 +827,6 @@ public class TestHFileBlock {
+ ")");
}
}
-
is.close();
}
}
@@ -874,9 +939,9 @@ public class TestHFileBlock {
ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
+ HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf,
- HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
+ HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
ByteBuffer buff1 = ByteBuffer.allocate(length);
ByteBuffer buff2 = ByteBuffer.allocate(length);
blockWithNextBlockMetadata.serialize(buff1, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 73f1c24..6f8d0b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -192,7 +192,7 @@ public class TestHFileBlockIndex {
}
missCount += 1;
- prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
+ prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false, true);
prevOffset = offset;
prevOnDiskSize = onDiskSize;
prevPread = pread;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 1222d07..508b1fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -109,7 +109,7 @@ public class TestHFileEncryption {
private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size)
throws IOException {
- HFileBlock b = hbr.readBlockData(pos, -1, false, false);
+ HFileBlock b = hbr.readBlockData(pos, -1, false, false, true);
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertFalse(b.isUnpacked());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index b92f7c6..f8da706 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -224,8 +224,8 @@ public class TestHFileWriterV3 {
fsdis.seek(0);
long curBlockPos = 0;
while (curBlockPos <= trailer.getLastDataBlockOffset()) {
- HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
- .unpack(context, blockReader);
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
+ .unpack(context, blockReader);
assertEquals(BlockType.DATA, block.getBlockType());
ByteBuff buf = block.getBufferWithoutHeader();
int keyLen = -1;
@@ -285,8 +285,8 @@ public class TestHFileWriterV3 {
while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
trailer.getLoadOnOpenDataOffset());
- HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false)
- .unpack(context, blockReader);
+ HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true)
+ .unpack(context, blockReader);
assertEquals(BlockType.META, block.getBlockType());
Text t = new Text();
ByteBuff buf = block.getBufferWithoutHeader();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
index 5935f91..f1a12a2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -160,7 +160,7 @@ public class TestLazyDataBlockDecompression {
CacheConfig cc = new CacheConfig(lazyCompressDisabled,
new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
assertFalse(cc.shouldCacheDataCompressed());
- assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
+ assertFalse(cc.isCombinedBlockCache());
LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
LOG.info("disabledBlockCache=" + disabledBlockCache);
assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 4e7291d..746cf8d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -59,10 +59,10 @@ public class TestBucketWriterThread {
private static class MockBucketCache extends BucketCache {
public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
- int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
- throws FileNotFoundException, IOException {
+ int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration)
+ throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen,
- persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
+ persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create());
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 3348386..db15ca6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -192,9 +191,7 @@ public abstract class AbstractTestDLS {
Path rootdir = FSUtils.getRootDir(conf);
int numRegions = 50;
- try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
- Table t = installTable(zkw, numRegions)) {
- TableName table = t.getName();
+ try (Table t = installTable(numRegions)) {
List<RegionInfo> regions = null;
HRegionServer hrs = null;
for (int i = 0; i < NUM_RS; i++) {
@@ -224,7 +221,6 @@ public abstract class AbstractTestDLS {
int count = 0;
for (RegionInfo hri : regions) {
- Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
Path editsdir = WALSplitUtil
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
@@ -266,8 +262,7 @@ public abstract class AbstractTestDLS {
// they will consume recovered.edits
master.balanceSwitch(false);
- try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
- Table ht = installTable(zkw, numRegionsToCreate)) {
+ try (Table ht = installTable(numRegionsToCreate)) {
HRegionServer hrs = findRSToKill(false);
List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
makeWAL(hrs, regions, numLogLines, 100);
@@ -329,8 +324,7 @@ public abstract class AbstractTestDLS {
final Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
- try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
- Table t = installTable(zkw, 40)) {
+ try (Table t = installTable(40)) {
makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
new Thread() {
@@ -380,8 +374,7 @@ public abstract class AbstractTestDLS {
startCluster(NUM_RS); // NUM_RS=6.
- try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null);
- Table table = installTable(zkw, numRegionsToCreate)) {
+ try (Table table = installTable(numRegionsToCreate)) {
populateDataInTable(numRowsPerRegion);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
@@ -482,11 +475,11 @@ public abstract class AbstractTestDLS {
}
}
- private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
- return installTable(zkw, nrs, 0);
+ private Table installTable(int nrs) throws Exception {
+ return installTable(nrs, 0);
}
- private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception {
+ private Table installTable(int nrs, int existingRegions) throws Exception {
// Create a table with regions
byte[] family = Bytes.toBytes("family");
LOG.info("Creating table with " + nrs + " regions");
@@ -497,14 +490,14 @@ public abstract class AbstractTestDLS {
}
assertEquals(nrs, numRegions);
LOG.info("Waiting for no more RIT\n");
- blockUntilNoRIT(zkw, master);
+ blockUntilNoRIT();
// disable-enable cycle to get rid of table's dead regions left behind
// by createMultiRegions
assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
LOG.debug("Disabling table\n");
TEST_UTIL.getAdmin().disableTable(tableName);
LOG.debug("Waiting for no more RIT\n");
- blockUntilNoRIT(zkw, master);
+ blockUntilNoRIT();
NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
LOG.debug("Verifying only catalog region is assigned\n");
if (regions.size() != 1) {
@@ -515,7 +508,7 @@ public abstract class AbstractTestDLS {
LOG.debug("Enabling table\n");
TEST_UTIL.getAdmin().enableTable(tableName);
LOG.debug("Waiting for no more RIT\n");
- blockUntilNoRIT(zkw, master);
+ blockUntilNoRIT();
LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
assertEquals(numRegions + 1 + existingRegions, regions.size());
@@ -651,7 +644,7 @@ public abstract class AbstractTestDLS {
return count;
}
- private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception {
+ private void blockUntilNoRIT() throws Exception {
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
}