You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2022/02/18 14:26:45 UTC

[hbase] branch master updated: HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e100198  HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
e100198 is described below

commit e100198ab5c0d55fee65710d599229fe91622dde
Author: Yutong Xiao <yu...@gmail.com>
AuthorDate: Fri Feb 18 22:24:51 2022 +0800

    HBASE-26659 The ByteBuffer of metadata in RAMQueueEntry in BucketCache could be reused. (#4026)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  6 +++---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 22 ++++++++++++++--------
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  3 ++-
 .../io/hfile/bucket/TestBucketCacheRefCnt.java     |  4 ++--
 .../io/hfile/bucket/TestBucketWriterThread.java    |  9 ++++++---
 5 files changed, 27 insertions(+), 17 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index fd6cea1..7c7fa4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -235,7 +235,8 @@ public class HFileBlock implements Cacheable {
    * (This EXTRA info came in with original commit of the bucketcache, HBASE-7404. It was
    * formerly known as EXTRA_SERIALIZATION_SPACE).
    */
-  static final int BLOCK_METADATA_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
+  public static final int BLOCK_METADATA_SPACE =
+    Bytes.SIZEOF_BYTE + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
 
   /**
    * Each checksum value is an integer that can be stored in 4 bytes.
@@ -1883,8 +1884,7 @@ public class HFileBlock implements Cacheable {
   /**
    * For use by bucketcache. This exposes internals.
    */
-  public ByteBuffer getMetaData() {
-    ByteBuffer bb = ByteBuffer.allocate(BLOCK_METADATA_SPACE);
+  public ByteBuffer getMetaData(ByteBuffer bb) {
     bb = addMetaData(bb, true);
     bb.flip();
     return bb;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 25859d4..e056454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -945,6 +945,7 @@ public class BucketCache implements BlockCache, HeapSize {
   class WriterThread extends Thread {
     private final BlockingQueue<RAMQueueEntry> inputQueue;
     private volatile boolean writerEnabled = true;
+    private final ByteBuffer metaBuff = ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE);
 
     WriterThread(BlockingQueue<RAMQueueEntry> queue) {
       super("BucketCacheWriterThread");
@@ -970,7 +971,7 @@ public class BucketCache implements BlockCache, HeapSize {
                 break;
               }
             }
-            doDrain(entries);
+            doDrain(entries, metaBuff);
           } catch (Exception ioe) {
             LOG.error("WriterThread encountered error", ioe);
           }
@@ -1046,7 +1047,7 @@ public class BucketCache implements BlockCache, HeapSize {
    * @param entries Presumes list passed in here will be processed by this invocation only. No
    *          interference expected.
    */
-  void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
+  void doDrain(final List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
     if (entries.isEmpty()) {
       return;
     }
@@ -1074,9 +1075,14 @@ public class BucketCache implements BlockCache, HeapSize {
         if (ramCache.containsKey(cacheKey)) {
           blocksByHFile.add(cacheKey);
         }
-
+        // Reset the position for reuse.
+        // It should be guaranteed that the data in the metaBuff has been transferred to the
+        // ioEngine safely. Otherwise, this reuse is problematic. Fortunately, the data is already
+        // transferred with our current IOEngines. Should take care, when we have new kinds of
+        // IOEngine in the future.
+        metaBuff.clear();
         BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
-          this::createRecycler);
+          this::createRecycler, metaBuff);
         // Successfully added. Up index and add bucketEntry. Clear io exceptions.
         bucketEntries[index] = bucketEntry;
         if (ioErrorStartTime > 0) {
@@ -1504,8 +1510,8 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator alloc,
-        final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler)
-        throws IOException {
+        final LongAdder realCacheSize, Function<BucketEntry, Recycler> createRecycler,
+        ByteBuffer metaBuff) throws IOException {
       int len = data.getSerializedLength();
       // This cacheable thing can't be serialized
       if (len == 0) {
@@ -1522,9 +1528,9 @@ public class BucketCache implements BlockCache, HeapSize {
           // If an instance of HFileBlock, save on some allocations.
           HFileBlock block = (HFileBlock) data;
           ByteBuff sliceBuf = block.getBufferReadOnly();
-          ByteBuffer metadata = block.getMetaData();
+          block.getMetaData(metaBuff);
           ioEngine.write(sliceBuf, offset);
-          ioEngine.write(metadata, offset + len - metadata.limit());
+          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
         } else {
           // Only used for testing.
           ByteBuffer bb = ByteBuffer.allocate(len);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 22b48dc..45120a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -686,7 +686,8 @@ public class TestBucketCache {
 
     Assert.assertEquals(0, allocator.getUsedSize());
     try {
-      re.writeToCache(ioEngine, allocator, null, null);
+      re.writeToCache(ioEngine, allocator, null, null,
+        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
       Assert.fail();
     } catch (Exception e) {
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
index fd083fd..e9c6e5e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCacheRefCnt.java
@@ -627,8 +627,8 @@ public class TestBucketCacheRefCnt {
     }
 
     @Override
-    void doDrain(List<RAMQueueEntry> entries) throws InterruptedException {
-      super.doDrain(entries);
+    void doDrain(List<RAMQueueEntry> entries, ByteBuffer metaBuff) throws InterruptedException {
+      super.doDrain(entries, metaBuff);
       if (entries.size() > 0) {
         /**
          * Caching Block completed,release {@link #GET_BLOCK_THREAD_NAME} and
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 0ba7dea..1d4f1f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -140,7 +142,7 @@ public class TestBucketWriterThread {
     RAMQueueEntry rqe = q.remove();
     RAMQueueEntry spiedRqe = Mockito.spy(rqe);
     Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
-        writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
     // Cache disabled when ioes w/o ever healing.
@@ -162,7 +164,8 @@ public class TestBucketWriterThread {
     BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
     Mockito.doThrow(cfe).
       doReturn(mockedBucketEntry).
-        when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+        when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
+        Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
   }
@@ -171,7 +174,7 @@ public class TestBucketWriterThread {
       final BlockingQueue<RAMQueueEntry> q)
   throws InterruptedException {
     List<RAMQueueEntry> rqes = BucketCache.getRAMQueueEntries(q, new ArrayList<>(1));
-    bc.doDrain(rqes);
+    bc.doDrain(rqes, ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
     assertTrue(q.isEmpty());
     assertTrue(bc.ramCache.isEmpty());
     assertEquals(0, bc.heapSize());