You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/02/18 14:26:45 UTC
[hbase] branch master updated: HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new e100198 HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
e100198 is described below
commit e100198ab5c0d55fee65710d599229fe91622dde
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Fri Feb 18 22:24:51 2022 +0800
HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 6 +++---
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 22 ++++++++++++++--------
.../hbase/io/hfile/bucket/TestBucketCache.java | 3 ++-
.../io/hfile/bucket/TestBucketCacheRefCnt.java | 4 ++--
.../io/hfile/bucket/TestBucketWriterThread.java | 9 ++++++---
5 files changed, 27 insertions(+), 17 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index fd6cea1..7c7fa4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -235,7 +235,8 @@ public class HFileBlock implements Cacheable {
* (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
* formerly known as EXTRA_SERIALIZATION_SPACE).
*/
- static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
+ public static final int BLOCK_METADATA_SPACE =
+ Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
@@ -1883,8 +1884,7 @@ public class HFileBlock implements Cacheable {
/**
* For use by bucketcache. This exposes internals.
*/
- public ByteBuffer getMetaData() {
- ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
+ public ByteBuffer getMetaData(ByteBuffer bb) {
bb = addMetaData(bb, true);
bb.flip();
return bb;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 25859d4..e056454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -945,6 +945,7 @@ public class BucketCache implements BlockCache, HeapSize {
class WriterThread extends Thread {
private final BlockingQueue<RAMQueueEntry> inputQueue;
private volatile boolean writerEnabled = true;
+ private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
WriterThread(BlockingQueue<RAMQueueEntry> queue) {
super("BucketCacheWriterThread");
@@ -970,7 +971,7 @@ public class BucketCache implements BlockCache, HeapSize {
break;
}
}
- doDrain(entries);
+ doDrain(entries, metaBuff);
} catch (Exception ioe) {
LOG.error("WriterThread encountered error", ioe);
}
@@ -1046,7 +1047,7 @@ public class BucketCache implements BlockCache, HeapSize {
* @param entries Presumes list passed in here will be processed by this invocation only. No
* interference expected.
*/
- void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
+ void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
if (entries.isEmpty()) {
return;
}
@@ -1074,9 +1075,14 @@ public class BucketCache implements BlockCache, HeapSize {
if (ramCache.containsKey(cacheKey)) {
blocksByHFile.add(cacheKey);
}
-
+ // Reset the position for reuse.
+ // It should be guaranteed that the data in the metaBuff has been transferred to the
+ // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
+ // transferred with our current IOEngines. Should take care, when we have new kinds of
+ // IOEngine in the future.
+ metaBuff.clear();
BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
- this::createRecycler);
+ this::createRecycler, metaBuff);
// Successfully added. Up index and add bucketEntry. Clear io exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
@@ -1504,8 +1510,8 @@ public class BucketCache implements BlockCache, HeapSize {
}
public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
- final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
- throws IOException {
+ final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
+ ByteBuffer metaBuff) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
@@ -1522,9 +1528,9 @@ public class BucketCache implements BlockCache, HeapSize {
// If an instance of HFileBlock, save on some allocations.
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly();
- ByteBuffer metadata = block.getMetaData();
+ block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset);
- ioEngine.write(metadata, offset + len - metadata.limit());
+ ioEngine.write(metaBuff, offset + len - metaBuff.limit());
} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 22b48dc..45120a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -686,7 +686,8 @@ public class TestBucketCache {
Assert.assertEquals(0, allocator.getUsedSize());
try {
- re.writeToCache(ioEngine, allocator, null, null);
+ re.writeToCache(ioEngine, allocator, null, null,
+ ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
Assert.fail();
} catch (Exception e) {
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index fd083fd..e9c6e5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -627,8 +627,8 @@ public class TestBucketCacheRefCnt {
}
@Override
- void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
- super.doDrain(entries);
+ void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
+ super.doDrain(entries, metaBuff);
if (entries.size() > 0) {
/**
* Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 0ba7dea..1d4f1f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -140,7 +142,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
- writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing.
@@ -162,7 +164,8 @@ public class TestBucketWriterThread {
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).
doReturn(mockedBucketEntry).
- when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
@@ -171,7 +174,7 @@ public class TestBucketWriterThread {
final BlockingQueue<RAMQueueEntry> q)
throws InterruptedException {
List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
- bc.doDrain(rqes);
+ bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
assertTrue(q.isEmpty());
assertTrue(bc.ramCache.isEmpty());
assertEquals(0, bc.heapSize());