You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/07/10 04:38:52 UTC
[30/50] [abbrv] hbase git commit: HBASE-20789
TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky
HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0454878e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0454878e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0454878e
Branch: refs/heads/HBASE-18477
Commit: 0454878e71e4fdb2d9c9cc130d02657f64babea0
Parents: 66ad9fd
Author: huzheng <hu...@xiaomi.com>
Authored: Thu Jun 28 12:01:03 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Tue Jul 3 17:56:34 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/io/hfile/BlockCacheUtil.java | 55 +++++++++++---
.../hadoop/hbase/io/hfile/LruBlockCache.java | 17 +----
.../hbase/io/hfile/bucket/BucketCache.java | 78 ++++++++++++--------
.../hbase/io/hfile/bucket/TestBucketCache.java | 33 ++++++---
4 files changed, 116 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 8a100ae..36f9e61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -198,20 +198,55 @@ public class BlockCacheUtil {
*/
public static int validateBlockAddition(Cacheable existing, Cacheable newBlock,
BlockCacheKey cacheKey) {
- int comparison = compareCacheBlock(existing, newBlock, true);
+ int comparison = compareCacheBlock(existing, newBlock, false);
if (comparison != 0) {
- LOG.warn("Cached block contents differ, trying to just compare the block contents " +
- "without the next block. CacheKey: " + cacheKey);
-
- // compare the contents, if they are not equal, we are in big trouble
- int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false);
+ throw new RuntimeException("Cached block contents differ, which should not have happened."
+ + "cacheKey:" + cacheKey);
+ }
+ if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) {
+ comparison = ((HFileBlock) existing).getNextBlockOnDiskSize()
+ - ((HFileBlock) newBlock).getNextBlockOnDiskSize();
+ }
+ return comparison;
+ }
- if (comparisonWithoutNextBlockMetadata != 0) {
- throw new RuntimeException("Cached block contents differ, which should not have happened."
- + "cacheKey:" + cacheKey);
+ /**
+ * Because of the region splitting, it's possible that the split key locate in the middle of a
+ * block. So it's possible that both the daughter regions load the same block from their parent
+ * HFile. When pread, we don't force the read to read all of the next block header. So when two
+ * threads try to cache the same block, it's possible that one thread read all of the next block
+ * header but the other one didn't. if the already cached block hasn't next block header but the
+ * new block to cache has, then we can replace the existing block with the new block for better
+ * performance.(HBASE-20447)
+ * @param blockCache BlockCache to check
+ * @param cacheKey the block cache key
+ * @param newBlock the new block which try to put into the block cache.
+ * @return true means need to replace existing block with new block for the same block cache key.
+ * false means just keep the existing block.
+ */
+ public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
+ BlockCacheKey cacheKey, Cacheable newBlock) {
+ Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
+ try {
+ int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
+ if (comparison < 0) {
+ LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has "
+ + "nextBlockOnDiskSize set. Caching new block.");
+ return true;
+ } else if (comparison > 0) {
+ LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has "
+ + "nextBlockOnDiskSize set, Keeping cached block.");
+ return false;
+ } else {
+ LOG.warn("Caching an already cached block: {}. This is harmless and can happen in rare "
+ + "cases (see HBASE-8547)",
+ cacheKey);
+ return false;
}
+ } finally {
+ // return the block since we need to decrement the count
+ blockCache.returnBlock(cacheKey, existingBlock);
}
- return comparison;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 8b8bd88..1dab053 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -379,21 +379,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
}
LruCachedBlock cb = map.get(cacheKey);
- if (cb != null) {
- int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey);
- if (comparison != 0) {
- if (comparison < 0) {
- LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
- return;
- } else {
- LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
- }
- } else {
- String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
- msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
- LOG.debug(msg);
- return;
- }
+ if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
+ return;
}
long currentSize = size.get();
long currentAcceptableSize = acceptableSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index bc45d39..40c0a00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -418,42 +418,35 @@ public class BucketCache implements BlockCache, HeapSize {
* @param inMemory if block is in-memory
* @param wait if true, blocking wait when queue is full
*/
- public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+ private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
boolean wait) {
- if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
- if (!cacheEnabled) {
- return;
- }
-
- if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
- Cacheable existingBlock = getBlock(cacheKey, false, false, false);
-
- try {
- int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey);
- if (comparison != 0) {
- if (comparison < 0) {
- LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
- return;
- } else {
- LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
- }
- } else {
- String msg = "Caching an already cached block: " + cacheKey;
- msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
- LOG.warn(msg);
- return;
+ if (cacheEnabled) {
+ if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
+ if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
+ cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
}
- } finally {
- // return the block since we need to decrement the count
- returnBlock(cacheKey, existingBlock);
+ } else {
+ cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
}
}
+ }
- /*
- * Stuff the entry into the RAM cache so it can get drained to the persistent store
- */
+ private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
+ boolean inMemory, boolean wait) {
+ if (!cacheEnabled) {
+ return;
+ }
+ LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
+ // Stuff the entry into the RAM cache so it can get drained to the persistent store
RAMQueueEntry re =
new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
+ /**
+ * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
+ * key in ramCache, the heap size of bucket cache need to update if replacing entry from
+ * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
+ * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
+ * one, then the heap size will mess up (HBASE-20789)
+ */
if (ramCache.putIfAbsent(cacheKey, re) != null) {
return;
}
@@ -937,6 +930,31 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
+ * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
+ * cache with a new block for the same cache key. there's a corner case: one thread cache a
+ * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
+ * new block with the same cache key do the same thing for the same cache key, so if not evict
+ * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
+ * but the bucketAllocator do not free its memory.
+ * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
+ * cacheKey, Cacheable newBlock)
+ * @param key Block cache key
+ * @param bucketEntry Bucket entry to put into backingMap.
+ */
+ private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
+ BucketEntry previousEntry = backingMap.put(key, bucketEntry);
+ if (previousEntry != null && previousEntry != bucketEntry) {
+ ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
+ lock.writeLock().lock();
+ try {
+ blockEvicted(key, previousEntry, false);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ }
+
+ /**
* Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
* Process all that are passed in even if failure being sure to remove from ramCache else we'll
* never undo the references and we'll OOME.
@@ -1017,7 +1035,7 @@ public class BucketCache implements BlockCache, HeapSize {
BlockCacheKey key = entries.get(i).getKey();
// Only add if non-null entry.
if (bucketEntries[i] != null) {
- backingMap.put(key, bucketEntries[i]);
+ putIntoBackingMap(key, bucketEntries[i]);
}
// Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index b365669..924dd02 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -197,14 +197,19 @@ public class TestBucketCache {
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}
+ private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
+ throws InterruptedException {
+ while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
+ Thread.sleep(100);
+ }
+ }
+
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
Cacheable block) throws InterruptedException {
cache.cacheBlock(cacheKey, block);
- while (!cache.backingMap.containsKey(cacheKey)) {
- Thread.sleep(100);
- }
+ waitUntilFlushedToBucket(cache, cacheKey);
}
@Test
@@ -409,7 +414,7 @@ public class TestBucketCache {
}
@Test
- public void testCacheBlockNextBlockMetadataMissing() {
+ public void testCacheBlockNextBlockMetadataMissing() throws Exception {
int size = 100;
int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
byte[] byteArr = new byte[length];
@@ -427,22 +432,26 @@ public class TestBucketCache {
blockWithNextBlockMetadata.serialize(block1Buffer, true);
blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
- //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
+ // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
- block1Buffer);
+ block1Buffer);
- //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
+ waitUntilFlushedToBucket(cache, key);
+
+ // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
- block1Buffer);
+ block1Buffer);
- //Clear and add blockWithoutNextBlockMetadata
+ // Clear and add blockWithoutNextBlockMetadata
cache.evictBlock(key);
assertNull(cache.getBlock(key, false, false, false));
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
- block2Buffer);
+ block2Buffer);
+
+ waitUntilFlushedToBucket(cache, key);
- //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
+ // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
- block1Buffer);
+ block1Buffer);
}
}