You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2022/10/04 22:33:59 UTC
[hbase] branch branch-2.4 updated: HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time
This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 5fa5b0364f3 HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time
5fa5b0364f3 is described below
commit 5fa5b0364f37ee78bba41375dec23e14cfec7af7
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
AuthorDate: Wed Oct 5 04:03:45 2022 +0530
HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time
---
.../apache/hadoop/hbase/io/hfile/BlockCache.java | 13 ++++
.../hadoop/hbase/io/hfile/CombinedBlockCache.java | 8 ++-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 6 +-
.../hadoop/hbase/io/hfile/HFileWriterImpl.java | 2 +-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 29 ++++++---
.../hbase/io/hfile/bucket/TestBucketCache.java | 69 +++++++++++++++++++---
.../io/hfile/bucket/TestBucketCacheRefCnt.java | 2 -
hbase-server/src/test/resources/hbase-site.xml | 5 ++
8 files changed, 111 insertions(+), 23 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 52eaa30317a..1caa1b76f5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
*/
void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
+ /**
+ * Add block to cache.
+ * @param cacheKey The block's cache key.
+ * @param buf The block contents wrapped in a ByteBuffer.
+ * @param inMemory Whether block should be treated as in-memory
+ * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
+ * configured.
+ */
+ default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+ boolean waitWhenCache) {
+ cacheBlock(cacheKey, buf, inMemory);
+ }
+
/**
* Add block to cache (defaults to not in-memory).
* @param cacheKey The block's cache key.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 358ee2057a2..9e02cff873e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -53,11 +53,17 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+ cacheBlock(cacheKey, buf, inMemory, false);
+ }
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+ boolean waitWhenCache) {
boolean metaBlock = buf.getBlockType().getCategory() != BlockType.BlockCategory.DATA;
if (metaBlock) {
l1Cache.cacheBlock(cacheKey, buf, inMemory);
} else {
- l2Cache.cacheBlock(cacheKey, buf, inMemory);
+ l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index e64bdd8a59d..07ff1894781 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1324,7 +1324,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
- cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
+ cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
}
});
@@ -1337,8 +1337,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
- cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
- cacheConf.isInMemory());
+ // Using the wait on cache during compaction and prefetching.
+ cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
}
});
if (unpacked != hfileBlock) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 819e5663984..2f6fb972648 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -534,7 +534,7 @@ public class HFileWriterImpl implements HFile.Writer {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
- cacheFormatBlock);
+ cacheFormatBlock, cacheConf.isInMemory(), true);
} finally {
// refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
cacheFormatBlock.release();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 9eac9e85d08..34e2690ab1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -167,13 +167,6 @@ public class BucketCache implements BlockCache, HeapSize {
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
- /**
- * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
- * some blocks when caching. If the flag is true, we will wait until blocks are flushed to
- * IOEngine.
- */
- boolean wait_when_cache = false;
-
private final BucketCacheStats cacheStats = new BucketCacheStats();
private final String persistencePath;
@@ -239,6 +232,10 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
+ private static final String QUEUE_ADDITION_WAIT_TIME =
+ "hbase.bucketcache.queue.addition.waittime";
+ private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
+ private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
* integrity, default algorithm is MD5
@@ -273,6 +270,8 @@ public class BucketCache implements BlockCache, HeapSize {
this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
+ this.queueAdditionWaitTime =
+ conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
sanityCheckConfigs();
@@ -415,7 +414,19 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
- cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
+ cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
+ }
+
+ /**
+ * Cache the block with the specified name and buffer.
+ * @param cacheKey block's cache key
+ * @param cachedItem block buffer
+ * @param inMemory if block is in-memory
+ */
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+ boolean waitWhenCache) {
+ cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
}
/**
@@ -471,7 +482,7 @@ public class BucketCache implements BlockCache, HeapSize {
boolean successfulAddition = false;
if (wait) {
try {
- successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
+ successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 2781e63e53b..30fac090e2b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -120,7 +121,6 @@ public class TestBucketCache {
int writerThreads, int writerQLen, String persistencePath) throws IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
persistencePath);
- super.wait_when_cache = true;
}
@Override
@@ -242,8 +242,8 @@ public class TestBucketCache {
// BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
// threads will flush it to the bucket and put reference entry in backingMap.
private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
- Cacheable block) throws InterruptedException {
- cache.cacheBlock(cacheKey, block);
+ Cacheable block, boolean waitWhenCache) throws InterruptedException {
+ cache.cacheBlock(cacheKey, block, false, waitWhenCache);
waitUntilFlushedToBucket(cache, cacheKey);
}
@@ -251,7 +251,7 @@ public class TestBucketCache {
public void testMemoryLeak() throws Exception {
final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
- new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+ new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
long lockId = cache.backingMap.get(cacheKey).offset();
ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
lock.writeLock().lock();
@@ -266,7 +266,7 @@ public class TestBucketCache {
cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
assertEquals(0, cache.getBlockCount());
cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
- new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+ new CacheTestUtils.ByteArrayCacheable(new byte[10]), false);
assertEquals(1, cache.getBlockCount());
lock.writeLock().unlock();
evictThread.join();
@@ -312,7 +312,8 @@ public class TestBucketCache {
bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
}
for (HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
+ false);
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
@@ -640,7 +641,7 @@ public class TestBucketCache {
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
- hfileBlockPair.getBlock());
+ hfileBlockPair.getBlock(), false);
}
usedByteSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedByteSize);
@@ -665,4 +666,58 @@ public class TestBucketCache {
}
}
+ @Test
+ public void testBlockAdditionWaitWhenCache() throws Exception {
+ try {
+ final Path dataTestDir = createAndGetTestDir();
+
+ String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
+ String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
+
+ BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, 1, 1, persistencePath);
+ long usedByteSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedByteSize);
+
+ HFileBlockPair[] hfileBlockPairs =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
+ // Add blocks
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
+ true);
+ }
+
+ // Max wait for 10 seconds.
+ long timeout = 10000;
+ // Wait for blocks size to match the number of blocks.
+ while (bucketCache.backingMap.size() != 10) {
+ if (timeout <= 0) break;
+ Threads.sleep(100);
+ timeout = -100;
+ }
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
+ }
+ usedByteSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedByteSize);
+ // persist cache to file
+ bucketCache.shutdown();
+ assertTrue(new File(persistencePath).exists());
+
+ // restore cache from file
+ bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
+ assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
+
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
+ bucketCache.evictBlock(blockCacheKey);
+ }
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ HBASE_TESTING_UTILITY.cleanupTestDir();
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index 5988a5fd813..1593900d7d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt {
// Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
public void testBlockInRAMCache() throws IOException {
cache = create(1, 1000);
- // Set this to true;
- cache.wait_when_cache = true;
disableWriter();
final String prefix = "testBlockInRamCache";
try {
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 5e64bfcd907..4a5653aee24 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -282,4 +282,9 @@
<value>3</value>
<description>Default is unbounded</description>
</property>
+ <property>
+ <name>hbase.bucketcache.queue.addition.waittime</name>
+ <value>1000</value>
+ <description>Default is 0</description>
+ </property>
</configuration>