You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/09/29 13:55:26 UTC

[hbase] branch branch-2.4 updated: HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 3cc539a  HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
3cc539a is described below

commit 3cc539a561c4d35dc661c155768ffbfe2d2f073f
Author: chenglei <ch...@apache.org>
AuthorDate: Wed Sep 29 21:24:12 2021 +0800

    HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  7 ++-
 .../hadoop/hbase/io/hfile/bucket/BucketEntry.java  | 62 +++++++++++--------
 .../hbase/io/hfile/bucket/BucketProtoUtils.java    | 11 +++-
 .../hbase/io/hfile/bucket/TestBucketCache.java     | 72 +++++++++++++++++++++-
 .../io/hfile/bucket/TestByteBufferIOEngine.java    |  4 +-
 5 files changed, 125 insertions(+), 31 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 45f46a3..67f64d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -634,7 +634,7 @@ public class BucketCache implements BlockCache, HeapSize {
    *   it is {@link ByteBuffAllocator#putbackBuffer}.
    * </pre>
    */
-  protected Recycler createRecycler(final BucketEntry bucketEntry) {
+  private Recycler createRecycler(final BucketEntry bucketEntry) {
     return () -> {
       freeBucketEntry(bucketEntry);
       return;
@@ -1017,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize {
           continue;
         }
         BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
-          (entry) -> createRecycler(entry));
+          this::createRecycler);
         // Successfully added. Up index and add bucketEntry. Clear io exceptions.
         bucketEntries[index] = bucketEntry;
         if (ioErrorStartTime > 0) {
@@ -1217,7 +1217,8 @@ public class BucketCache implements BlockCache, HeapSize {
       LOG.info("Persistent file is old format, it does not support verifying file integrity!");
     }
     verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
-    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
+    backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
+      this::createRecycler);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index ca79f69..222cd80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -66,19 +66,27 @@ class BucketEntry implements HBaseReferenceCounted {
   private BlockPriority priority;
 
   /**
-   * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
-   * considering as one path, the {@link BucketCache#backingMap} reference is also considered a
-   * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
-   * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
-   * will increase or decrease as the following: <br>
-   * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
-   * refCnt ++; <br>
-   * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
-   * usually happen when HFile is closing or someone call the clearBucketCache by force. <br>
-   * 3. The read RPC path start to refer the block which is backend by the memory area in
-   * bucketEntry, then refCnt ++ ; <br>
-   * 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br>
-   * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+   * <pre>
+   * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
+   * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
+   *   always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
+   *   if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
+   *   increase.
+   *
+   * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
+   *   reading path is considering as one path, the {@link BucketCache#backingMap} reference is
+   *   also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
+   *   then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
+   *   the {@link BucketEntry},so the refCnt will increase or decrease as the following:
+   *   (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
+   *       the refCnt ++;
+   *   (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
+   *       it usually happen when HFile is closing or someone call the clearBucketCache by force.
+   *   (3) The read RPC path start to refer the block which is backend by the memory area in
+   *       bucketEntry, then refCnt ++ ;
+   *   (4) The read RPC patch shipped the response, and release the block. then refCnt--;
+   *    Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+   * </pre>
    */
   private final RefCnt refCnt;
   final AtomicBoolean markedAsEvicted;
@@ -89,22 +97,22 @@ class BucketEntry implements HBaseReferenceCounted {
    */
   private final long cachedTime = System.nanoTime();
 
-  BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
-    this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
-  }
-
+  /**
+   * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
+   *          becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used for test.
+   */
   BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
       Function<BucketEntry, Recycler> createRecycler,
       ByteBuffAllocator allocator) {
+    if (createRecycler == null) {
+      throw new IllegalArgumentException("createRecycler could not be null!");
+    }
     setOffset(offset);
     this.length = length;
     this.accessCounter = accessCounter;
     this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
-    if (createRecycler == null) {
-      this.refCnt = RefCnt.create();
-    } else {
-      this.refCnt = RefCnt.create(createRecycler.apply(this));
-    }
+    this.refCnt = RefCnt.create(createRecycler.apply(this));
+
     this.markedAsEvicted = new AtomicBoolean(false);
     this.allocator = allocator;
   }
@@ -173,13 +181,19 @@ class BucketEntry implements HBaseReferenceCounted {
   }
 
   /**
-   * Check whether have some RPC patch referring this block. There're two case: <br>
+   * Check whether have some RPC patch referring this block.<br/>
+   * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
+   * case: <br>
    * 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
    * 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
    * released its reference, the remaining reference can only be from RPC path. <br>
    * We use this check to decide whether we can free the block area: when cached size exceed the
    * acceptable size, our eviction policy will choose those stale blocks without any RPC reference
-   * and the RPC referred block will be excluded.
+   * and the RPC referred block will be excluded. <br/>
+   * <br/>
+   * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
+   * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
+   * so {@link BucketEntry#isRpcRef()} is always return false.
    * @return true to indicate there're some RPC referring the block.
    */
   boolean isRpcRef() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index f3d63d4..b2a00f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -23,7 +23,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.BlockPriority;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -127,7 +130,8 @@ final class BucketProtoUtils {
   }
 
   static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(
-      Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
+      Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
+      Function<BucketEntry, Recycler> createRecycler)
       throws IOException {
     ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
     for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
@@ -135,11 +139,14 @@ final class BucketProtoUtils {
       BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
           protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
       BucketCacheProtos.BucketEntry protoValue = entry.getValue();
+      // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
+      // which created by RpcServer elegantly.
       BucketEntry value = new BucketEntry(
           protoValue.getOffset(),
           protoValue.getLength(),
           protoValue.getAccessCounter(),
-          protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
+          protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
+          ByteBuffAllocator.HEAP);
       // This is the deserializer that we stored
       int oldIndex = protoValue.getDeserialiserIndex();
       String deserializerClass = deserializers.get(oldIndex);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 1b10fbd..308d96c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -113,6 +113,8 @@ public class TestBucketCache {
   String ioEngineName = "offheap";
   String persistencePath = null;
 
+  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
+
   private static class MockedBucketCache extends BucketCache {
 
     public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
@@ -145,6 +147,18 @@ public class TestBucketCache {
   }
 
   /**
+   * Test Utility to create test dir and return name
+   *
+   * @return return name of created dir
+   * @throws IOException throws IOException
+   */
+  private Path createAndGetTestDir() throws IOException {
+    final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
+    HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
+    return testDir;
+  }
+
+  /**
    * Return a random element from {@code a}.
    */
   private static <T> T randFrom(List<T> a) {
@@ -444,7 +458,10 @@ public class TestBucketCache {
     // This number is picked because it produces negative output if the values isn't ensured to be
     // positive. See HBASE-18757 for more information.
     long testValue = 549888460800L;
-    BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
+    BucketEntry bucketEntry =
+        new BucketEntry(testValue, 10, 10L, true, (entry) -> {
+          return ByteBuffAllocator.NONE;
+        }, ByteBuffAllocator.HEAP);
     assertEquals(testValue, bucketEntry.offset());
   }
 
@@ -579,4 +596,57 @@ public class TestBucketCache {
     }
     Assert.assertEquals(0, allocator.getUsedSize());
   }
+
+  /**
+   * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
+   * could not be freed even if corresponding {@link HFileBlock} is evicted from
+   * {@link BucketCache}.
+   */
+  @Test
+  public void testFreeBucketEntryRestoredFromFile() throws Exception {
+    try {
+      final Path dataTestDir = createAndGetTestDir();
+
+      String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
+      String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
+
+      BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+          constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+      long usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertEquals(0, usedByteSize);
+
+      HFileBlockPair[] hfileBlockPairs =
+          CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+      // Add blocks
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
+      }
+
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
+          hfileBlockPair.getBlock());
+      }
+      usedByteSize = bucketCache.getAllocator().getUsedSize();
+      assertNotEquals(0, usedByteSize);
+      // persist cache to file
+      bucketCache.shutdown();
+      assertTrue(new File(persistencePath).exists());
+
+      // restore cache from file
+      bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+          constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+      assertFalse(new File(persistencePath).exists());
+      assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
+
+      for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+        BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
+        bucketCache.evictBlock(blockCacheKey);
+      }
+      assertEquals(0, bucketCache.getAllocator().getUsedSize());
+      assertEquals(0, bucketCache.backingMap.size());
+    } finally {
+      HBASE_TESTING_UTILITY.cleanupTestDir();
+    }
+  }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 97a5283..677d602 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -49,7 +49,9 @@ public class TestByteBufferIOEngine {
     private long off;
 
     MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
-      super(offset & 0xFF00, length, 0, false, null, allocator);
+      super(offset & 0xFF00, length, 0, false, (entry) -> {
+        return ByteBuffAllocator.NONE;
+      }, allocator);
       this.off = offset;
     }