You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/09/29 13:56:02 UTC
[hbase] branch branch-2 updated: HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 0748165 HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
0748165 is described below
commit 0748165ab000825f6aa2bf4d90077c0188305423
Author: chenglei <ch...@apache.org>
AuthorDate: Wed Sep 29 21:24:12 2021 +0800
HBASE-26295 BucketCache could not free BucketEntry which restored fro… (#3699)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 7 ++-
.../hadoop/hbase/io/hfile/bucket/BucketEntry.java | 62 +++++++++++--------
.../hbase/io/hfile/bucket/BucketProtoUtils.java | 11 +++-
.../hbase/io/hfile/bucket/TestBucketCache.java | 72 +++++++++++++++++++++-
.../io/hfile/bucket/TestByteBufferIOEngine.java | 4 +-
5 files changed, 125 insertions(+), 31 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 45f46a3..67f64d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -634,7 +634,7 @@ public class BucketCache implements BlockCache, HeapSize {
* it is {@link ByteBuffAllocator#putbackBuffer}.
* </pre>
*/
- protected Recycler createRecycler(final BucketEntry bucketEntry) {
+ private Recycler createRecycler(final BucketEntry bucketEntry) {
return () -> {
freeBucketEntry(bucketEntry);
return;
@@ -1017,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize {
continue;
}
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
- (entry) -> createRecycler(entry));
+ this::createRecycler);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
@@ -1217,7 +1217,8 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info("Persistent file is old format, it does not support verifying file integrity!");
}
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass());
- backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap());
+ backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap(),
+ this::createRecycler);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index ca79f69..222cd80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -66,19 +66,27 @@ class BucketEntry implements HBaseReferenceCounted {
private BlockPriority priority;
/**
- * The RefCnt means how many paths are referring the {@link BucketEntry}, each RPC reading path is
- * considering as one path, the {@link BucketCache#backingMap} reference is also considered a
- * path. NOTICE that if two read RPC path hit the same {@link BucketEntry}, then the HFileBlocks
- * the two RPC referred will share the same refCnt instance with the BucketEntry. so the refCnt
- * will increase or decrease as the following: <br>
- * 1. when writerThread flush the block into IOEngine and add the bucketEntry into backingMap, the
- * refCnt ++; <br>
- * 2. If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--; it
- * usually happen when HFile is closing or someone call the clearBucketCache by force. <br>
- * 3. The read RPC path start to refer the block which is backend by the memory area in
- * bucketEntry, then refCnt ++ ; <br>
- * 4. The read RPC patch shipped the response, and release the block. then refCnt--; <br>
- * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+ * <pre>
+ * The RefCnt means how many paths are referring the {@link BucketEntry}, there are two cases:
+ * 1.If the {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),the refCnt is
+ * always 1 until this {@link BucketEntry} is evicted from {@link BucketCache#backingMap}.Even
+ * if the corresponding {@link HFileBlock} is referenced by RPC reading, the refCnt should not
+ * increase.
+ *
+ * 2.If the {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}),each RPC
+ * reading path is considering as one path, the {@link BucketCache#backingMap} reference is
+ * also considered a path. NOTICE that if two read RPC path hit the same {@link BucketEntry},
+ * then the {@link HFileBlock}s the two RPC referred will share the same refCnt instance with
+ * the {@link BucketEntry},so the refCnt will increase or decrease as the following:
+ * (1) when writerThread flush the block into IOEngine and add the bucketEntry into backingMap,
+ * the refCnt ++;
+ * (2) If BucketCache evict the block and move the bucketEntry out of backingMap, the refCnt--;
+ * it usually happen when HFile is closing or someone call the clearBucketCache by force.
+ * (3) The read RPC path start to refer the block which is backend by the memory area in
+ * bucketEntry, then refCnt ++ ;
+ * (4) The read RPC patch shipped the response, and release the block. then refCnt--;
+ * Once the refCnt decrease to zero, then the {@link BucketAllocator} will free the block area.
+ * </pre>
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
@@ -89,22 +97,22 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final long cachedTime = System.nanoTime();
- BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
- this(offset, length, accessCounter, inMemory, null, ByteBuffAllocator.HEAP);
- }
-
+ /**
+ * @param createRecycler used to free this {@link BucketEntry} when {@link BucketEntry#refCnt}
+ * becoming 0. NOTICE that {@link ByteBuffAllocator#NONE} could only be used for test.
+ */
BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
Function<BucketEntry, Recycler> createRecycler,
ByteBuffAllocator allocator) {
+ if (createRecycler == null) {
+ throw new IllegalArgumentException("createRecycler could not be null!");
+ }
setOffset(offset);
this.length = length;
this.accessCounter = accessCounter;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
- if (createRecycler == null) {
- this.refCnt = RefCnt.create();
- } else {
- this.refCnt = RefCnt.create(createRecycler.apply(this));
- }
+ this.refCnt = RefCnt.create(createRecycler.apply(this));
+
this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator;
}
@@ -173,13 +181,19 @@ class BucketEntry implements HBaseReferenceCounted {
}
/**
- * Check whether have some RPC patch referring this block. There're two case: <br>
+ * Check whether have some RPC patch referring this block.<br/>
+ * For {@link IOEngine#usesSharedMemory()} is true(eg.{@link ByteBufferIOEngine}), there're two
+ * case: <br>
* 1. If current refCnt is greater than 1, there must be at least one referring RPC path; <br>
* 2. If current refCnt is equal to 1 and the markedAtEvicted is true, the it means backingMap has
* released its reference, the remaining reference can only be from RPC path. <br>
* We use this check to decide whether we can free the block area: when cached size exceed the
* acceptable size, our eviction policy will choose those stale blocks without any RPC reference
- * and the RPC referred block will be excluded.
+ * and the RPC referred block will be excluded. <br/>
+ * <br/>
+ * For {@link IOEngine#usesSharedMemory()} is false(eg.{@link FileIOEngine}),
+ * {@link BucketEntry#refCnt} is always 1 until it is evicted from {@link BucketCache#backingMap},
+ * so {@link BucketEntry#isRpcRef()} is always return false.
* @return true to indicate there're some RPC referring the block.
*/
boolean isRpcRef() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index f3d63d4..b2a00f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -23,7 +23,10 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
@@ -127,7 +130,8 @@ final class BucketProtoUtils {
}
static ConcurrentHashMap<BlockCacheKey, BucketEntry> fromPB(
- Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap)
+ Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
+ Function<BucketEntry, Recycler> createRecycler)
throws IOException {
ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
@@ -135,11 +139,14 @@ final class BucketProtoUtils {
BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(),
protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType()));
BucketCacheProtos.BucketEntry protoValue = entry.getValue();
+ // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
+ // which created by RpcServer elegantly.
BucketEntry value = new BucketEntry(
protoValue.getOffset(),
protoValue.getLength(),
protoValue.getAccessCounter(),
- protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory);
+ protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
+ ByteBuffAllocator.HEAP);
// This is the deserializer that we stored
int oldIndex = protoValue.getDeserialiserIndex();
String deserializerClass = deserializers.get(oldIndex);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 1b10fbd..308d96c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -113,6 +113,8 @@ public class TestBucketCache {
String ioEngineName = "offheap";
String persistencePath = null;
+ private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
+
private static class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
@@ -145,6 +147,18 @@ public class TestBucketCache {
}
/**
+ * Test Utility to create test dir and return name
+ *
+ * @return return name of created dir
+ * @throws IOException throws IOException
+ */
+ private Path createAndGetTestDir() throws IOException {
+ final Path testDir = HBASE_TESTING_UTILITY.getDataTestDir();
+ HBASE_TESTING_UTILITY.getTestFileSystem().mkdirs(testDir);
+ return testDir;
+ }
+
+ /**
* Return a random element from {@code a}.
*/
private static <T> T randFrom(List<T> a) {
@@ -444,7 +458,10 @@ public class TestBucketCache {
// This number is picked because it produces negative output if the values isn't ensured to be
// positive. See HBASE-18757 for more information.
long testValue = 549888460800L;
- BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true);
+ BucketEntry bucketEntry =
+ new BucketEntry(testValue, 10, 10L, true, (entry) -> {
+ return ByteBuffAllocator.NONE;
+ }, ByteBuffAllocator.HEAP);
assertEquals(testValue, bucketEntry.offset());
}
@@ -579,4 +596,57 @@ public class TestBucketCache {
}
Assert.assertEquals(0, allocator.getUsedSize());
}
+
+ /**
+ * This test is for HBASE-26295, {@link BucketEntry} which is restored from a persistence file
+ * could not be freed even if corresponding {@link HFileBlock} is evicted from
+ * {@link BucketCache}.
+ */
+ @Test
+ public void testFreeBucketEntryRestoredFromFile() throws Exception {
+ try {
+ final Path dataTestDir = createAndGetTestDir();
+
+ String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
+ String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
+
+ BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ long usedByteSize = bucketCache.getAllocator().getUsedSize();
+ assertEquals(0, usedByteSize);
+
+ HFileBlockPair[] hfileBlockPairs =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+ // Add blocks
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ bucketCache.cacheBlock(hfileBlockPair.getBlockName(), hfileBlockPair.getBlock());
+ }
+
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, hfileBlockPair.getBlockName(),
+ hfileBlockPair.getBlock());
+ }
+ usedByteSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedByteSize);
+ // persist cache to file
+ bucketCache.shutdown();
+ assertTrue(new File(persistencePath).exists());
+
+ // restore cache from file
+ bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
+ assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
+
+ for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
+ BlockCacheKey blockCacheKey = hfileBlockPair.getBlockName();
+ bucketCache.evictBlock(blockCacheKey);
+ }
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ HBASE_TESTING_UTILITY.cleanupTestDir();
+ }
+ }
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 97a5283..677d602 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -49,7 +49,9 @@ public class TestByteBufferIOEngine {
private long off;
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
- super(offset & 0xFF00, length, 0, false, null, allocator);
+ super(offset & 0xFF00, length, 0, false, (entry) -> {
+ return ByteBuffAllocator.NONE;
+ }, allocator);
this.off = offset;
}