You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/10/17 01:44:56 UTC
[hbase] branch master updated: HBASE-23107 Avoid temp byte array
creation when doing cacheDataOnWrite (#678)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 0043dfe HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)
0043dfe is described below
commit 0043dfebc5e43705818071c3de062211943829f1
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Thu Oct 17 09:44:40 2019 +0800
HBASE-23107 Avoid temp byte array creation when doing cacheDataOnWrite (#678)
Signed-off-by: huzheng <op...@gmail.com>
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/io/ByteArrayOutputStream.java | 5 ++
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 54 ++++++++++++++++------
.../hadoop/hbase/io/hfile/HFileWriterImpl.java | 14 ++++--
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 26 +++++------
4 files changed, 64 insertions(+), 35 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index 38c986a..77dd3b8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -22,6 +22,7 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -112,6 +113,10 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferWri
return Arrays.copyOf(buf, pos);
}
+ public void toByteBuff(ByteBuff buff) {
+ buff.put(buf, 0, pos);
+ }
+
/**
* @return the underlying array where the data gets accumulated
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 9b11523..a723e52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -842,6 +842,8 @@ public class HFileBlock implements Cacheable {
/** Meta data that holds information about the hfileblock**/
private HFileContext fileContext;
+ private final ByteBuffAllocator allocator;
+
@Override
public void beforeShipped() {
if (getEncodingState() != null) {
@@ -856,12 +858,19 @@ public class HFileBlock implements Cacheable {
/**
* @param dataBlockEncoder data block encoding algorithm to use
*/
+ @VisibleForTesting
public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext) {
+ this(dataBlockEncoder, fileContext, ByteBuffAllocator.HEAP);
+ }
+
+ public Writer(HFileDataBlockEncoder dataBlockEncoder, HFileContext fileContext,
+ ByteBuffAllocator allocator) {
if (fileContext.getBytesPerChecksum() < HConstants.HFILEBLOCK_HEADER_SIZE) {
throw new RuntimeException("Unsupported value of bytesPerChecksum. " +
" Minimum is " + HConstants.HFILEBLOCK_HEADER_SIZE + " but the configured value is " +
fileContext.getBytesPerChecksum());
}
+ this.allocator = allocator;
this.dataBlockEncoder = dataBlockEncoder != null?
dataBlockEncoder: NoOpDataBlockEncoder.INSTANCE;
this.dataBlockEncodingCtx = this.dataBlockEncoder.
@@ -1012,6 +1021,18 @@ public class HFileBlock implements Cacheable {
Bytes.putInt(dest, offset, onDiskDataSize);
}
+ private void putHeader(ByteBuff buff, int onDiskSize,
+ int uncompressedSize, int onDiskDataSize) {
+ buff.rewind();
+ blockType.write(buff);
+ buff.putInt(onDiskSize - HConstants.HFILEBLOCK_HEADER_SIZE);
+ buff.putInt(uncompressedSize - HConstants.HFILEBLOCK_HEADER_SIZE);
+ buff.putLong(prevOffset);
+ buff.put(fileContext.getChecksumType().getCode());
+ buff.putInt(fileContext.getBytesPerChecksum());
+ buff.putInt(onDiskDataSize);
+ }
+
private void putHeader(ByteArrayOutputStream dest, int onDiskSize,
int uncompressedSize, int onDiskDataSize) {
putHeader(dest.getBuffer(),0, onDiskSize, uncompressedSize, onDiskDataSize);
@@ -1170,19 +1191,19 @@ public class HFileBlock implements Cacheable {
* cache. Can be called in the "writing" state or the "block ready" state.
* Returns only the header and data, does not include checksum data.
*
- * @return Returns a copy of uncompressed block bytes for caching on write
+ * @return Returns an uncompressed block ByteBuff for caching on write
*/
- @VisibleForTesting
- ByteBuffer cloneUncompressedBufferWithHeader() {
+ ByteBuff cloneUncompressedBufferWithHeader() {
expectState(State.BLOCK_READY);
- byte[] uncompressedBlockBytesWithHeader = baosInMemory.toByteArray();
+ ByteBuff bytebuff = allocator.allocate(baosInMemory.size());
+ baosInMemory.toByteBuff(bytebuff);
int numBytes = (int) ChecksumUtil.numBytes(
onDiskBlockBytesWithHeader.size(),
fileContext.getBytesPerChecksum());
- putHeader(uncompressedBlockBytesWithHeader, 0,
- onDiskBlockBytesWithHeader.size() + numBytes,
- baosInMemory.size(), onDiskBlockBytesWithHeader.size());
- return ByteBuffer.wrap(uncompressedBlockBytesWithHeader);
+ putHeader(bytebuff, onDiskBlockBytesWithHeader.size() + numBytes,
+ baosInMemory.size(), onDiskBlockBytesWithHeader.size());
+ bytebuff.rewind();
+ return bytebuff;
}
/**
@@ -1191,9 +1212,12 @@ public class HFileBlock implements Cacheable {
* include checksum data.
* @return Returns a copy of block bytes for caching on write
*/
- private ByteBuffer cloneOnDiskBufferWithHeader() {
+ private ByteBuff cloneOnDiskBufferWithHeader() {
expectState(State.BLOCK_READY);
- return ByteBuffer.wrap(onDiskBlockBytesWithHeader.toByteArray());
+ ByteBuff bytebuff = allocator.allocate(onDiskBlockBytesWithHeader.size());
+ onDiskBlockBytesWithHeader.toByteBuff(bytebuff);
+ bytebuff.rewind();
+ return bytebuff;
}
private void expectState(State expectedState) {
@@ -1246,24 +1270,24 @@ public class HFileBlock implements Cacheable {
.build();
// Build the HFileBlock.
HFileBlockBuilder builder = new HFileBlockBuilder();
- ByteBuffer buffer;
+ ByteBuff buff;
if (cacheConf.shouldCacheCompressed(blockType.getCategory())) {
- buffer = cloneOnDiskBufferWithHeader();
+ buff = cloneOnDiskBufferWithHeader();
} else {
- buffer = cloneUncompressedBufferWithHeader();
+ buff = cloneUncompressedBufferWithHeader();
}
return builder.withBlockType(blockType)
.withOnDiskSizeWithoutHeader(getOnDiskSizeWithoutHeader())
.withUncompressedSizeWithoutHeader(getUncompressedSizeWithoutHeader())
.withPrevBlockOffset(prevOffset)
- .withByteBuff(ByteBuff.wrap(buffer))
+ .withByteBuff(buff)
.withFillHeader(FILL_HEADER)
.withOffset(startOffset)
.withNextBlockOnDiskSize(UNSET)
.withOnDiskDataSizeWithHeader(onDiskBlockBytesWithHeader.size() + onDiskChecksum.length)
.withHFileContext(newContext)
.withByteBuffAllocator(cacheConf.getByteBuffAllocator())
- .withShared(!buffer.hasArray())
+ .withShared(!buff.hasArray())
.build();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index fa5f1f1..93cca8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -294,9 +294,8 @@ public class HFileWriterImpl implements HFile.Writer {
if (blockWriter != null) {
throw new IllegalStateException("finishInit called twice");
}
-
- blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext);
-
+ blockWriter = new HFileBlock.Writer(blockEncoder, hFileContext,
+ cacheConf.getByteBuffAllocator());
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(blockWriter,
@@ -546,8 +545,13 @@ public class HFileWriterImpl implements HFile.Writer {
private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
- cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
- cacheFormatBlock);
+ try {
+ cache.cacheBlock(new BlockCacheKey(name, offset, true, cacheFormatBlock.getBlockType()),
+ cacheFormatBlock);
+ } finally {
+ // refCnt will auto increase when block add to Cache, see RAMCache#putIfAbsent
+ cacheFormatBlock.release();
+ }
});
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index d0e98fd..006415c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -452,26 +452,21 @@ public class TestHFileBlock {
HFileBlock.Writer hbw = new HFileBlock.Writer(dataBlockEncoder, meta);
long totalSize = 0;
final List<Integer> encodedSizes = new ArrayList<>();
- final List<ByteBuffer> encodedBlocks = new ArrayList<>();
+ final List<ByteBuff> encodedBlocks = new ArrayList<>();
for (int blockId = 0; blockId < numBlocks; ++blockId) {
hbw.startWriting(BlockType.DATA);
writeTestKeyValues(hbw, blockId, includesMemstoreTS, includesTag);
hbw.writeHeaderAndData(os);
int headerLen = HConstants.HFILEBLOCK_HEADER_SIZE;
- byte[] encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader().array();
- final int encodedSize = encodedResultWithHeader.length - headerLen;
+ ByteBuff encodedResultWithHeader = hbw.cloneUncompressedBufferWithHeader();
+ final int encodedSize = encodedResultWithHeader.limit() - headerLen;
if (encoding != DataBlockEncoding.NONE) {
// We need to account for the two-byte encoding algorithm ID that
// comes after the 24-byte block header but before encoded KVs.
headerLen += DataBlockEncoding.ID_SIZE;
}
- byte[] encodedDataSection =
- new byte[encodedResultWithHeader.length - headerLen];
- System.arraycopy(encodedResultWithHeader, headerLen,
- encodedDataSection, 0, encodedDataSection.length);
- final ByteBuffer encodedBuf =
- ByteBuffer.wrap(encodedDataSection);
encodedSizes.add(encodedSize);
+ ByteBuff encodedBuf = encodedResultWithHeader.position(headerLen).slice();
encodedBlocks.add(encodedBuf);
totalSize += hbw.getOnDiskSizeWithHeader();
}
@@ -521,12 +516,11 @@ public class TestHFileBlock {
actualBuffer = actualBuffer.slice();
}
- ByteBuffer expectedBuffer = encodedBlocks.get(blockId);
- expectedBuffer.rewind();
+ ByteBuff expectedBuff = encodedBlocks.get(blockId);
+ expectedBuff.rewind();
// test if content matches, produce nice message
- assertBuffersEqual(new SingleByteBuff(expectedBuffer), actualBuffer, algo, encoding,
- pread);
+ assertBuffersEqual(expectedBuff, actualBuffer, algo, encoding, pread);
// test serialized blocks
for (boolean reuseBuffer : new boolean[] { false, true }) {
@@ -882,8 +876,10 @@ public class TestHFileBlock {
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
- if (cacheOnWrite)
- expectedContents.add(hbw.cloneUncompressedBufferWithHeader());
+ if (cacheOnWrite) {
+ ByteBuff buff = hbw.cloneUncompressedBufferWithHeader();
+ expectedContents.add(buff.asSubByteBuffer(buff.capacity()));
+ }
if (detailedLogging) {
LOG.info("Written block #" + i + " of type " + bt