You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/10/17 01:44:56 UTC

[hbase] branch master updated: HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 0043dfe  HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)
0043dfe is described below

commit 0043dfebc5e43705818071c3de062211943829f1
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Thu Oct 17 09:44:40 2019 +0800

    HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)
    
    Signed-off-by: huzheng <op...@gmail.com>
    Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/io/ByteArrayOutputStream.java     |  5 ++
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   | 54 ++++++++++++++++------
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java     | 14 ++++--
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      | 26 +++++------
 4 files changed, 64 insertions(+), 35 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index 38c986a..77dd3b8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -22,6 +22,7 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -112,6 +113,10 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri
     return Arrays.copyOf(buf, pos);
   }
 
+  public void toByteBuff(ByteBuff buff) {
+    buff.put(buf, 0, pos);
+  }
+
   /**
    * @return the underlying array where the data gets accumulated
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 9b11523..a723e52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -842,6 +842,8 @@ public class HFileBlock implements Cacheable {
     /** Meta data that holds information about the hfileblock**/
     private HFileContext fileContext;
 
+    private final ByteBuffAllocator allocator;
+
     @Override
     public void beforeShipped() {
       if (getEncodingState() != null) {
@@ -856,12 +858,19 @@ public class HFileBlock implements Cacheable {
     /**
      * @param dataBlockEncoder data block encoding algorithm to use
      */
+    @VisibleForTesting
     public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
+      this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
+    }
+
+    public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
+        ByteBuffAllocator allocator) {
       if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
         throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
             " Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
             fileContext.getBytesPerChecksum());
       }
+      this.allocator = allocator;
       this.dataBlockEncoder = dataBlockEncoder != null?
           dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
       this.dataBlockEncodingCtx = this.dataBlockEncoder.
@@ -1012,6 +1021,18 @@ public class HFileBlock implements Cacheable {
       Bytes.putInt(dest, offset, onDiskDataSize);
     }
 
+    private void putHeader(ByteBuff buff, int onDiskSize,
+        int uncompressedSize, int onDiskDataSize) {
+      buff.rewind();
+      blockType.write(buff);
+      buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
+      buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
+      buff.putLong(prevOffset);
+      buff.put(fileContext.getChecksumType().getCode());
+      buff.putInt(fileContext.getBytesPerChecksum());
+      buff.putInt(onDiskDataSize);
+    }
+
     private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
         int uncompressedSize, int onDiskDataSize) {
       putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
@@ -1170,19 +1191,19 @@ public class HFileBlock implements Cacheable {
      * cache. Can be called in the "writing" state or the "block ready" state.
      * Returns only the header and data, does not include checksum data.
      *
-     * @return Returns a copy of uncompressed block bytes for caching on write
+     * @return Returns an uncompressed block ByteBuff for caching on write
      */
-    @VisibleForTesting
-    ByteBuffer cloneUncompressedBufferWithHeader() {
+    ByteBuff cloneUncompressedBufferWithHeader() {
       expectState(State.BLOCK_READY);
-      byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
+      ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
+      baosInMemory.toByteBuff(bytebuff);
       int numBytes = (int) ChecksumUtil.numBytes(
           onDiskBlockBytesWithHeader.size(),
           fileContext.getBytesPerChecksum());
-      putHeader(uncompressedBlockBytesWithHeader, 0,
-        onDiskBlockBytesWithHeader.size() + numBytes,
-        baosInMemory.size(), onDiskBlockBytesWithHeader.size());
-      return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
+      putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
+          baosInMemory.size(), onDiskBlockBytesWithHeader.size());
+      bytebuff.rewind();
+      return bytebuff;
     }
 
     /**
@@ -1191,9 +1212,12 @@ public class HFileBlock implements Cacheable {
      * include checksum data.
      * @return Returns a copy of block bytes for caching on write
      */
-    private ByteBuffer cloneOnDiskBufferWithHeader() {
+    private ByteBuff cloneOnDiskBufferWithHeader() {
       expectState(State.BLOCK_READY);
-      return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
+      ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
+      onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
+      bytebuff.rewind();
+      return bytebuff;
     }
 
     private void expectState(State expectedState) {
@@ -1246,24 +1270,24 @@ public class HFileBlock implements Cacheable {
                                 .build();
       // Build the HFileBlock.
       HFileBlockBuilder builder = new HFileBlockBuilder();
-      ByteBuffer buffer;
+      ByteBuff buff;
       if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
-        buffer = cloneOnDiskBufferWithHeader();
+        buff = cloneOnDiskBufferWithHeader();
       } else {
-        buffer = cloneUncompressedBufferWithHeader();
+        buff = cloneUncompressedBufferWithHeader();
       }
       return builder.withBlockType(blockType)
           .withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
           .withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
           .withPrevBlockOffset(prevOffset)
-          .withByteBuff(ByteBuff.wrap(buffer))
+          .withByteBuff(buff)
           .withFillHeader(FILL_HEADER)
           .withOffset(startOffset)
           .withNextBlockOnDiskSize(UNSET)
           .withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
           .withHFileContext(newContext)
           .withByteBuffAllocator(cacheConf.getByteBuffAllocator())
-          .withShared(!buffer.hasArray())
+          .withShared(!buff.hasArray())
           .build();
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index fa5f1f1..93cca8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -294,9 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
     if (blockWriter != null) {
       throw new IllegalStateException("finishInit called twice");
     }
-
-    blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
-
+    blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
+        cacheConf.getByteBuffAllocator());
     // Data block index writer
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@@ -546,8 +545,13 @@ public class HFileWriterImpl implements HFile.Writer {
   private void doCacheOnWrite(long offset) {
     cacheConf.getBlockCache().ifPresent(cache -> {
       HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
-      cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
-          cacheFormatBlock);
+      try {
+        cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
+            cacheFormatBlock);
+      } finally {
+        // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
+        cacheFormatBlock.release();
+      }
     });
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index d0e98fd..006415c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -452,26 +452,21 @@ public class TestHFileBlock {
           HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
           long totalSize = 0;
           final List<Integer> encodedSizes = new ArrayList<>();
-          final List<ByteBuffer> encodedBlocks = new ArrayList<>();
+          final List<ByteBuff> encodedBlocks = new ArrayList<>();
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
             hbw.startWriting(BlockType.DATA);
             writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
             hbw.writeHeaderAndData(os);
             int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
-            byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
-            final int encodedSize = encodedResultWithHeader.length - headerLen;
+            ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
+            final int encodedSize = encodedResultWithHeader.limit() - headerLen;
             if (encoding != DataBlockEncoding.NONE) {
               // We need to account for the two-byte encoding algorithm ID that
               // comes after the 24-byte block header but before encoded KVs.
               headerLen += DataBlockEncoding.ID_SIZE;
             }
-            byte[] encodedDataSection =
-                new byte[encodedResultWithHeader.length - headerLen];
-            System.arraycopy(encodedResultWithHeader, headerLen,
-                encodedDataSection, 0, encodedDataSection.length);
-            final ByteBuffer encodedBuf =
-                ByteBuffer.wrap(encodedDataSection);
             encodedSizes.add(encodedSize);
+            ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
             encodedBlocks.add(encodedBuf);
             totalSize += hbw.getOnDiskSizeWithHeader();
           }
@@ -521,12 +516,11 @@ public class TestHFileBlock {
               actualBuffer = actualBuffer.slice();
             }
 
-            ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
-            expectedBuffer.rewind();
+            ByteBuff expectedBuff = encodedBlocks.get(blockId);
+            expectedBuff.rewind();
 
             // test if content matches, produce nice message
-            assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
-                pread);
+            assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
 
             // test serialized blocks
             for (boolean reuseBuffer : new boolean[] { false, true }) {
@@ -882,8 +876,10 @@ public class TestHFileBlock {
       hbw.writeHeaderAndData(os);
       totalSize += hbw.getOnDiskSizeWithHeader();
 
-      if (cacheOnWrite)
-        expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
+      if (cacheOnWrite) {
+        ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
+        expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
+      }
 
       if (detailedLogging) {
         LOG.info("Written block #" + i + " of type " + bt