You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2022/10/04 22:33:59 UTC

[hbase] branch branch-2.4 updated: HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time

This is an automated email from the ASF dual-hosted git repository.

rajeshbabu pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 5fa5b0364f3 HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time
5fa5b0364f3 is described below

commit 5fa5b0364f37ee78bba41375dec23e14cfec7af7
Author: Rajeshbabu Chintaguntla <ra...@apache.org>
AuthorDate: Wed Oct 5 04:03:45 2022 +0530

    HBASE-27365 Minimise block addition failures due to no space in bucket cache writers queue by introducing wait time
---
 .../apache/hadoop/hbase/io/hfile/BlockCache.java   | 13 ++++
 .../hadoop/hbase/io/hfile/CombinedBlockCache.java  |  8 ++-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |  6 +-
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     |  2 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 29 ++++++---
 .../hbase/io/hfile/bucket/TestBucketCache.java     | 69 +++++++++++++++++++---
 .../io/hfile/bucket/TestBucketCacheRefCnt.java     |  2 -
 hbase-server/src/test/resources/hbase-site.xml     |  5 ++
 8 files changed, 111 insertions(+), 23 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 52eaa30317a..1caa1b76f5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -34,6 +34,19 @@ public interface BlockCache extends Iterable<CachedBlock> {
    */
   void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory);
 
+  /**
+   * Add block to cache.
+   * @param cacheKey      The block's cache key.
+   * @param buf           The block contents wrapped in a ByteBuffer.
+   * @param inMemory      Whether block should be treated as in-memory
+   * @param waitWhenCache Whether to wait for the cache to be flushed mainly when BucketCache is
+   *                      configured.
+   */
+  default void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+    boolean waitWhenCache) {
+    cacheBlock(cacheKey, buf, inMemory);
+  }
+
   /**
    * Add block to cache (defaults to not in-memory).
    * @param cacheKey The block's cache key.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 358ee2057a2..9e02cff873e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -53,11 +53,17 @@ public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
 
   @Override
   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+    cacheBlock(cacheKey, buf, inMemory, false);
+  }
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+    boolean waitWhenCache) {
     boolean metaBlock = buf.getBlockType().getCategory() != BlockType.BlockCategory.DATA;
     if (metaBlock) {
       l1Cache.cacheBlock(cacheKey, buf, inMemory);
     } else {
-      l2Cache.cacheBlock(cacheKey, buf, inMemory);
+      l2Cache.cacheBlock(cacheKey, buf, inMemory, waitWhenCache);
     }
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index e64bdd8a59d..07ff1894781 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1324,7 +1324,7 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
           // Cache the block if necessary
           cacheConf.getBlockCache().ifPresent(cache -> {
             if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-              cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
+              cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), cacheOnly);
             }
           });
 
@@ -1337,8 +1337,8 @@ public abstract class HFileReaderImpl implements HFile.Reader, Configurable {
         // Cache the block if necessary
         cacheConf.getBlockCache().ifPresent(cache -> {
           if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
-            cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
-              cacheConf.isInMemory());
+            // Using the wait on cache during compaction and prefetching.
+            cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked, cacheOnly);
           }
         });
         if (unpacked != hfileBlock) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index 819e5663984..2f6fb972648 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -534,7 +534,7 @@ public class HFileWriterImpl implements HFile.Writer {
       HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
       try {
         cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
-          cacheFormatBlock);
+          cacheFormatBlock, cacheConf.isInMemory(), true);
       } finally {
         // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
         cacheFormatBlock.release();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 9eac9e85d08..34e2690ab1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -167,13 +167,6 @@ public class BucketCache implements BlockCache, HeapSize {
 
   private static final int DEFAULT_CACHE_WAIT_TIME = 50;
 
-  /**
-   * Used in tests. If this flag is false and the cache speed is very fast, bucket cache will skip
-   * some blocks when caching. If the flag is true, we will wait until blocks are flushed to
-   * IOEngine.
-   */
-  boolean wait_when_cache = false;
-
   private final BucketCacheStats cacheStats = new BucketCacheStats();
 
   private final String persistencePath;
@@ -239,6 +232,10 @@ public class BucketCache implements BlockCache, HeapSize {
     "hbase.bucketcache.persistent.file.integrity.check.algorithm";
   private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
 
+  private static final String QUEUE_ADDITION_WAIT_TIME =
+    "hbase.bucketcache.queue.addition.waittime";
+  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
+  private long queueAdditionWaitTime;
   /**
    * Use {@link java.security.MessageDigest} class's encryption algorithms to check persistent file
    * integrity, default algorithm is MD5
@@ -273,6 +270,8 @@ public class BucketCache implements BlockCache, HeapSize {
     this.singleFactor = conf.getFloat(SINGLE_FACTOR_CONFIG_NAME, DEFAULT_SINGLE_FACTOR);
     this.multiFactor = conf.getFloat(MULTI_FACTOR_CONFIG_NAME, DEFAULT_MULTI_FACTOR);
     this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME, DEFAULT_MEMORY_FACTOR);
+    this.queueAdditionWaitTime =
+      conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
 
     sanityCheckConfigs();
 
@@ -415,7 +414,19 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   @Override
   public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory) {
-    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
+    cacheBlockWithWait(cacheKey, cachedItem, inMemory, false);
+  }
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * @param cacheKey   block's cache key
+   * @param cachedItem block buffer
+   * @param inMemory   if block is in-memory
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+    boolean waitWhenCache) {
+    cacheBlockWithWait(cacheKey, cachedItem, inMemory, waitWhenCache && queueAdditionWaitTime > 0);
   }
 
   /**
@@ -471,7 +482,7 @@ public class BucketCache implements BlockCache, HeapSize {
     boolean successfulAddition = false;
     if (wait) {
       try {
-        successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
+        successfulAddition = bq.offer(re, queueAdditionWaitTime, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 2781e63e53b..30fac090e2b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -120,7 +121,6 @@ public class TestBucketCache {
       int writerThreads, int writerQLen, String persistencePath) throws IOException {
       super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
         persistencePath);
-      super.wait_when_cache = true;
     }
 
     @Override
@@ -242,8 +242,8 @@ public class TestBucketCache {
   // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
   // threads will flush it to the bucket and put reference entry in backingMap.
   private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
-    Cacheable block) throws InterruptedException {
-    cache.cacheBlock(cacheKey, block);
+    Cacheable block, boolean waitWhenCache) throws InterruptedException {
+    cache.cacheBlock(cacheKey, block, false, waitWhenCache);
     waitUntilFlushedToBucket(cache, cacheKey);
   }
 
@@ -251,7 +251,7 @@ public class TestBucketCache {
   public void testMemoryLeak() throws Exception {
     final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
-      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+      new CacheTestUtils.ByteArrayCacheable(new byte[10]), true);
     long lockId = cache.backingMap.get(cacheKey).offset();
     ReentrantReadWriteLock lock = cache.offsetLock.getLock(lockId);
     lock.writeLock().lock();
@@ -266,7 +266,7 @@ public class TestBucketCache {
     cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
     assertEquals(0, cache.getBlockCount());
     cacheAndWaitUntilFlushedToBucket(cache, cacheKey,
-      new CacheTestUtils.ByteArrayCacheable(new byte[10]));
+      new CacheTestUtils.ByteArrayCacheable(new byte[10]), false);
     assertEquals(1, cache.getBlockCount());
     lock.writeLock().unlock();
     evictThread.join();
@@ -312,7 +312,8 @@ public class TestBucketCache {
       bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
     }
     for (HFileBlockPair block : blocks) {
-      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock());
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock(),
+        false);
     }
     usedSize = bucketCache.getAllocator().getUsedSize();
     assertNotEquals(0, usedSize);
@@ -640,7 +641,7 @@ public class TestBucketCache {
 
       for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
         cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
-          hfileBlockPair.getBlock());
+          hfileBlockPair.getBlock(), false);
       }
       usedByteSize = bucketCache.getAllocator().getUsedSize();
       assertNotEquals(0, usedByteSize);
@@ -665,4 +666,58 @@ public class TestBucketCache {
     }
   }
 
+  @Test
+  public void testBlockAdditionWaitWhenCache() throws Exception {
+    try {
+      final Path dataTestDir = createAndGetTestDir();
+
+      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
+      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
+
+      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+        constructedBlockSizes, 1, 1, persistencePath);
+      long usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedByteSize);
+
+      HFileBlockPair[] hfileBlockPairs =
+        CacheTestUtils.generateHFileBlocks(constructedBlockSize, 10);
+      // Add blocks
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock(), false,
+          true);
+      }
+
+      // Max wait for 10 seconds.
+      long timeout = 10000;
+      // Wait for blocks size to match the number of blocks.
+      while (bucketCache.backingMap.size() != 10) {
+        if (timeout <= 0) break;
+        Threads.sleep(100);
+        timeout = -100;
+      }
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        assertTrue(bucketCache.backingMap.containsKey(hfileBlockPair.getBlockName()));
+      }
+      usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedByteSize);
+      // persist cache to file
+      bucketCache.shutdown();
+      assertTrue(new File(persistencePath).exists());
+
+      // restore cache from file
+      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+        constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
+
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
+        bucketCache.evictBlock(blockCacheKey);
+      }
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index 5988a5fd813..1593900d7d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -110,8 +110,6 @@ public class TestBucketCacheRefCnt {
   // Flakey TestBucketCacheRefCnt.testBlockInRAMCache:121 expected:<3> but was:<2>
   public void testBlockInRAMCache() throws IOException {
     cache = create(1, 1000);
-    // Set this to true;
-    cache.wait_when_cache = true;
     disableWriter();
     final String prefix = "testBlockInRamCache";
     try {
diff --git a/hbase-server/src/test/resources/hbase-site.xml b/hbase-server/src/test/resources/hbase-site.xml
index 5e64bfcd907..4a5653aee24 100644
--- a/hbase-server/src/test/resources/hbase-site.xml
+++ b/hbase-server/src/test/resources/hbase-site.xml
@@ -282,4 +282,9 @@
     <value>3</value>
     <description>Default is unbounded</description>
   </property>
+  <property>
+    <name>hbase.bucketcache.queue.addition.waittime</name>
+    <value>1000</value>
+    <description>Default is 0</description>
+  </property>
 </configuration>