You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/09/09 09:52:27 UTC
[hbase] branch branch-2 updated: HBASE-22802 Avoid temp ByteBuffer
allocation in FileIOEngine#read (#583)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 4282d70 HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)
4282d70 is described below
commit 4282d702123ee0e3e37df95a3ac57e301f10554f
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Mon Sep 9 17:38:33 2019 +0800
HBASE-22802 Avoid temp ByteBuffer allocation in FileIOEngine#read (#583)
---
.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 35 ++++++++++-
.../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 65 ++++++++++++++++----
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 21 ++++++-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 7 ++-
.../hadoop/hbase/io/hfile/bucket/BucketEntry.java | 7 ++-
.../hfile/bucket/ExclusiveMemoryMmapIOEngine.java | 5 +-
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 64 ++++++++++----------
.../hfile/TestHFileScannerImplReferenceCount.java | 70 +++++++++++++++-------
.../io/hfile/bucket/TestByteBufferIOEngine.java | 11 +++-
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 32 ++++++++++
10 files changed, 240 insertions(+), 77 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index c04c3f5..3e0a830 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.List;
@@ -450,10 +451,37 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
*/
public abstract int read(ReadableByteChannel channel) throws IOException;
+ /**
+ * Reads bytes from FileChannel into this ByteBuff
+ */
+ public abstract int read(FileChannel channel, long offset) throws IOException;
+
+ /**
+ * Write this ByteBuff's data into target file
+ */
+ public abstract int write(FileChannel channel, long offset) throws IOException;
+
+ /**
+ * function interface for Channel read
+ */
+ @FunctionalInterface
+ interface ChannelReader {
+ int read(ReadableByteChannel channel, ByteBuffer buf, long offset) throws IOException;
+ }
+
+ static final ChannelReader CHANNEL_READER = (channel, buf, offset) -> {
+ return channel.read(buf);
+ };
+
+ static final ChannelReader FILE_READER = (channel, buf, offset) -> {
+ return ((FileChannel)channel).read(buf, offset);
+ };
+
// static helper methods
- public static int channelRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
+ public static int read(ReadableByteChannel channel, ByteBuffer buf, long offset,
+ ChannelReader reader) throws IOException {
if (buf.remaining() <= NIO_BUFFER_LIMIT) {
- return channel.read(buf);
+ return reader.read(channel, buf, offset);
}
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
@@ -463,7 +491,8 @@ public abstract class ByteBuff implements HBaseReferenceCounted {
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
- ret = channel.read(buf);
+ offset += ret;
+ ret = reader.read(channel, buf, offset);
if (ret < ioSize) {
break;
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
index 3ce1709..df0ae8e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java
@@ -24,7 +24,10 @@ import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
+import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -53,6 +56,23 @@ public class MultiByteBuff extends ByteBuff {
private int markedItemIndex = -1;
private final int[] itemBeginPos;
+ private Iterator<ByteBuffer> buffsIterator = new Iterator<ByteBuffer>() {
+ @Override
+ public boolean hasNext() {
+ return curItemIndex < limitedItemIndex ||
+ (curItemIndex == limitedItemIndex && items[curItemIndex].hasRemaining());
+ }
+
+ @Override
+ public ByteBuffer next() {
+ if (curItemIndex >= items.length) {
+ throw new NoSuchElementException("items overflow");
+ }
+ curItem = items[curItemIndex++];
+ return curItem;
+ }
+ };
+
public MultiByteBuff(ByteBuffer... items) {
this(NONE, items);
}
@@ -1064,23 +1084,44 @@ public class MultiByteBuff extends ByteBuff {
return output;
}
+ private int internalRead(ReadableByteChannel channel, long offset,
+ ChannelReader reader) throws IOException {
+ checkRefCount();
+ int total = 0;
+ while (buffsIterator.hasNext()) {
+ ByteBuffer buffer = buffsIterator.next();
+ int len = read(channel, buffer, offset, reader);
+ if (len > 0) {
+ total += len;
+ offset += len;
+ }
+ if (buffer.hasRemaining()) {
+ break;
+ }
+ }
+ return total;
+ }
+
@Override
public int read(ReadableByteChannel channel) throws IOException {
+ return internalRead(channel, 0, CHANNEL_READER);
+ }
+
+ @Override
+ public int read(FileChannel channel, long offset) throws IOException {
+ return internalRead(channel, offset, FILE_READER);
+ }
+
+ @Override
+ public int write(FileChannel channel, long offset) throws IOException {
checkRefCount();
int total = 0;
- while (true) {
- // Read max possible into the current BB
- int len = channelRead(channel, this.curItem);
- if (len > 0)
+ while (buffsIterator.hasNext()) {
+ ByteBuffer buffer = buffsIterator.next();
+ while (buffer.hasRemaining()) {
+ int len = channel.write(buffer, offset);
total += len;
- if (this.curItem.hasRemaining()) {
- // We were not able to read enough to fill the current BB itself. Means there is no point in
- // doing more reads from Channel. Only this much there for now.
- break;
- } else {
- if (this.curItemIndex >= this.limitedItemIndex) break;
- this.curItemIndex++;
- this.curItem = this.items[this.curItemIndex];
+ offset += len;
}
}
return total;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 36a83a0..797bfdc 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
@@ -371,7 +372,25 @@ public class SingleByteBuff extends ByteBuff {
@Override
public int read(ReadableByteChannel channel) throws IOException {
checkRefCount();
- return channelRead(channel, buf);
+ return read(channel, buf, 0, CHANNEL_READER);
+ }
+
+ @Override
+ public int read(FileChannel channel, long offset) throws IOException {
+ checkRefCount();
+ return read(channel, buf, offset, FILE_READER);
+ }
+
+ @Override
+ public int write(FileChannel channel, long offset) throws IOException {
+ checkRefCount();
+ int total = 0;
+ while(buf.hasRemaining()) {
+ int len = channel.write(buf, offset);
+ total += len;
+ offset += len;
+ }
+ return total;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 106bf3a..ef8c4d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -502,8 +502,11 @@ public class BucketCache implements BlockCache, HeapSize {
// block will use the refCnt of bucketEntry, which means if two HFileBlock mapping to
// the same BucketEntry, then all of the three will share the same refCnt.
Cacheable cachedBlock = ioEngine.read(bucketEntry);
- // RPC start to reference, so retain here.
- cachedBlock.retain();
+ if (ioEngine.usesSharedMemory()) {
+ // If IOEngine use shared memory, cachedBlock and BucketEntry will share the
+ // same RefCnt, do retain here, in order to count the number of RPC references
+ cachedBlock.retain();
+ }
// Update the cache statistics.
if (updateCacheMetrics) {
cacheStats.hit(caching, key.isPrimary(), key.getBlockType());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index ca41eca..2dd7775 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -80,7 +80,7 @@ class BucketEntry implements HBaseReferenceCounted {
*/
private final RefCnt refCnt;
final AtomicBoolean markedAsEvicted;
- private final ByteBuffAllocator allocator;
+ final ByteBuffAllocator allocator;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
@@ -194,7 +194,10 @@ class BucketEntry implements HBaseReferenceCounted {
}
Cacheable wrapAsCacheable(ByteBuffer[] buffers) throws IOException {
- ByteBuff buf = ByteBuff.wrap(buffers, this.refCnt);
+ return wrapAsCacheable(ByteBuff.wrap(buffers, this.refCnt));
+ }
+
+ Cacheable wrapAsCacheable(ByteBuff buf) throws IOException {
return this.deserializerReference().deserialize(buf, allocator);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
index 3d7f2b1..3169a66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
@@ -17,7 +17,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -35,9 +34,9 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@Override
public Cacheable read(BucketEntry be) throws IOException {
- ByteBuff dst = ByteBuff.wrap(ByteBuffer.allocate(be.getLength()));
+ ByteBuff dst = be.allocator.allocate(be.getLength());
bufferArray.read(be.offset(), dst);
dst.position(0).limit(be.getLength());
- return be.wrapAsCacheable(dst.nioByteBuffers());
+ return be.wrapAsCacheable(dst);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index b3afe48..9e6a75b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -129,20 +129,25 @@ public class FileIOEngine implements IOEngine {
long offset = be.offset();
int length = be.getLength();
Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0.");
- ByteBuffer dstBuffer = ByteBuffer.allocate(length);
+ ByteBuff dstBuff = be.allocator.allocate(length);
if (length != 0) {
- accessFile(readAccessor, dstBuffer, offset);
- // The buffer created out of the fileChannel is formed by copying the data from the file
- // Hence in this case there is no shared memory that we point to. Even if the BucketCache
- // evicts this buffer from the file the data is already copied and there is no need to
- // ensure that the results are not corrupted before consuming them.
- if (dstBuffer.limit() != length) {
- throw new IllegalArgumentIOException(
- "Only " + dstBuffer.limit() + " bytes read, " + length + " expected");
+ try {
+ accessFile(readAccessor, dstBuff, offset);
+ // The buffer created out of the fileChannel is formed by copying the data from the file
+ // Hence in this case there is no shared memory that we point to. Even if the BucketCache
+ // evicts this buffer from the file the data is already copied and there is no need to
+ // ensure that the results are not corrupted before consuming them.
+ if (dstBuff.limit() != length) {
+ throw new IllegalArgumentIOException(
+ "Only " + dstBuff.limit() + " bytes read, " + length + " expected");
+ }
+ } catch (IOException ioe) {
+ dstBuff.release();
+ throw ioe;
}
}
- dstBuffer.rewind();
- return be.wrapAsCacheable(new ByteBuffer[] { dstBuffer });
+ dstBuff.rewind();
+ return be.wrapAsCacheable(dstBuff);
}
@VisibleForTesting
@@ -164,10 +169,7 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
- if (!srcBuffer.hasRemaining()) {
- return;
- }
- accessFile(writeAccessor, srcBuffer, offset);
+ write(ByteBuff.wrap(srcBuffer), offset);
}
/**
@@ -208,28 +210,30 @@ public class FileIOEngine implements IOEngine {
}
@Override
- public void write(ByteBuff srcBuffer, long offset) throws IOException {
- ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
- write(dup, offset);
+ public void write(ByteBuff srcBuff, long offset) throws IOException {
+ if (!srcBuff.hasRemaining()) {
+ return;
+ }
+ accessFile(writeAccessor, srcBuff, offset);
}
- private void accessFile(FileAccessor accessor, ByteBuffer buffer,
+ private void accessFile(FileAccessor accessor, ByteBuff buff,
long globalOffset) throws IOException {
int startFileNum = getFileNum(globalOffset);
- int remainingAccessDataLen = buffer.remaining();
+ int remainingAccessDataLen = buff.remaining();
int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
int accessFileNum = startFileNum;
long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
- int bufLimit = buffer.limit();
+ int bufLimit = buff.limit();
while (true) {
FileChannel fileChannel = fileChannels[accessFileNum];
int accessLen = 0;
if (endFileNum > accessFileNum) {
// short the limit;
- buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
+ buff.limit((int) (buff.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
}
try {
- accessLen = accessor.access(fileChannel, buffer, accessOffset);
+ accessLen = accessor.access(fileChannel, buff, accessOffset);
} catch (ClosedByInterruptException e) {
throw e;
} catch (ClosedChannelException e) {
@@ -237,7 +241,7 @@ public class FileIOEngine implements IOEngine {
continue;
}
// recover the limit
- buffer.limit(bufLimit);
+ buff.limit(bufLimit);
if (accessLen < remainingAccessDataLen) {
remainingAccessDataLen -= accessLen;
accessFileNum++;
@@ -246,7 +250,7 @@ public class FileIOEngine implements IOEngine {
break;
}
if (accessFileNum >= fileChannels.length) {
- throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
+ throw new IOException("Required data len " + StringUtils.byteDesc(buff.remaining())
+ " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
+ globalOffset);
}
@@ -304,23 +308,23 @@ public class FileIOEngine implements IOEngine {
}
private interface FileAccessor {
- int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+ int access(FileChannel fileChannel, ByteBuff buff, long accessOffset)
throws IOException;
}
private static class FileReadAccessor implements FileAccessor {
@Override
- public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
- return fileChannel.read(byteBuffer, accessOffset);
+ return buff.read(fileChannel, accessOffset);
}
}
private static class FileWriteAccessor implements FileAccessor {
@Override
- public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ public int access(FileChannel fileChannel, ByteBuff buff,
long accessOffset) throws IOException {
- return fileChannel.write(byteBuffer, accessOffset);
+ return buff.write(fileChannel, accessOffset);
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
index dd9a1c8..fa67039 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileScannerImplReferenceCount.java
@@ -29,6 +29,8 @@ import static org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.MIN_INDEX_NUM_ENT
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@@ -58,9 +60,14 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
@Category({ IOTests.class, SmallTests.class })
public class TestHFileScannerImplReferenceCount {
@@ -71,6 +78,15 @@ public class TestHFileScannerImplReferenceCount {
@Rule
public TestName CASE = new TestName();
+ @Parameters(name = "{index}: ioengine={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[] { "file" }, new Object[] { "offheap" },
+ new Object[] { "mmap" }, new Object[] { "pmem" });
+ }
+
+ @Parameter
+ public String ioengine;
+
private static final Logger LOG =
LoggerFactory.getLogger(TestHFileScannerImplReferenceCount.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -113,12 +129,16 @@ public class TestHFileScannerImplReferenceCount {
@Before
public void setUp() throws IOException {
+ String caseName = CASE.getMethodName().replaceAll("[^a-zA-Z0-9]", "_");
+ this.workDir = UTIL.getDataTestDir(caseName);
+ if (!"offheap".equals(ioengine)) {
+ ioengine = ioengine + ":" + workDir.toString() + "/cachedata";
+ }
+ UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, ioengine);
this.firstCell = null;
this.secondCell = null;
this.allocator = ByteBuffAllocator.create(UTIL.getConfiguration(), true);
this.conf = new Configuration(UTIL.getConfiguration());
- String caseName = CASE.getMethodName();
- this.workDir = UTIL.getDataTestDir(caseName);
this.fs = this.workDir.getFileSystem(conf);
this.hfilePath = new Path(this.workDir, caseName + System.currentTimeMillis());
LOG.info("Start to write {} cells into hfile: {}, case:{}", CELL_COUNT, hfilePath, caseName);
@@ -202,34 +222,34 @@ public class TestHFileScannerImplReferenceCount {
scanner.seekTo(firstCell);
curBlock = scanner.curBlock;
- Assert.assertEquals(curBlock.refCnt(), 2);
+ this.assertRefCnt(curBlock, 2);
// Seek to the block again, the curBlock won't change and won't read from BlockCache. so
// refCnt should be unchanged.
scanner.seekTo(firstCell);
Assert.assertTrue(curBlock == scanner.curBlock);
- Assert.assertEquals(curBlock.refCnt(), 2);
+ this.assertRefCnt(curBlock, 2);
prevBlock = curBlock;
scanner.seekTo(secondCell);
curBlock = scanner.curBlock;
- Assert.assertEquals(prevBlock.refCnt(), 2);
- Assert.assertEquals(curBlock.refCnt(), 2);
+ this.assertRefCnt(prevBlock, 2);
+ this.assertRefCnt(curBlock, 2);
// After shipped, the prevBlock will be release, but curBlock is still referenced by the
// curBlock.
scanner.shipped();
- Assert.assertEquals(prevBlock.refCnt(), 1);
- Assert.assertEquals(curBlock.refCnt(), 2);
+ this.assertRefCnt(prevBlock, 1);
+ this.assertRefCnt(curBlock, 2);
// Try to ship again, though with nothing to client.
scanner.shipped();
- Assert.assertEquals(prevBlock.refCnt(), 1);
- Assert.assertEquals(curBlock.refCnt(), 2);
+ this.assertRefCnt(prevBlock, 1);
+ this.assertRefCnt(curBlock, 2);
// The curBlock will also be released.
scanner.close();
- Assert.assertEquals(curBlock.refCnt(), 1);
+ this.assertRefCnt(curBlock, 1);
// Finish the block & block2 RPC path
Assert.assertTrue(block1.release());
@@ -287,7 +307,7 @@ public class TestHFileScannerImplReferenceCount {
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block2);
Assert.assertEquals(1, block2.refCnt());
- Assert.assertEquals(2, curBlock.refCnt());
+ this.assertRefCnt(curBlock, 2);
prevBlock = scanner.curBlock;
// Release the block1, no other reference.
@@ -305,22 +325,22 @@ public class TestHFileScannerImplReferenceCount {
// the curBlock is read from IOEngine, so a different block.
Assert.assertFalse(curBlock == block1);
// Two reference for curBlock: 1. scanner; 2. blockCache.
- Assert.assertEquals(2, curBlock.refCnt());
+ this.assertRefCnt(curBlock, 2);
// Reference count of prevBlock must be unchanged because we haven't shipped.
- Assert.assertEquals(2, prevBlock.refCnt());
+ this.assertRefCnt(prevBlock, 2);
// Do the shipped
scanner.shipped();
Assert.assertEquals(scanner.prevBlocks.size(), 0);
Assert.assertNotNull(scanner.curBlock);
- Assert.assertEquals(2, curBlock.refCnt());
- Assert.assertEquals(1, prevBlock.refCnt());
+ this.assertRefCnt(curBlock, 2);
+ this.assertRefCnt(prevBlock, 1);
// Do the close
scanner.close();
Assert.assertNull(scanner.curBlock);
- Assert.assertEquals(1, curBlock.refCnt());
- Assert.assertEquals(1, prevBlock.refCnt());
+ this.assertRefCnt(curBlock, 1);
+ this.assertRefCnt(prevBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 2);
Assert.assertEquals(0, curBlock.refCnt());
@@ -340,18 +360,26 @@ public class TestHFileScannerImplReferenceCount {
Assert.assertTrue(scanner.seekTo());
curBlock = scanner.curBlock;
Assert.assertFalse(curBlock == block1);
- Assert.assertEquals(2, curBlock.refCnt());
+ this.assertRefCnt(curBlock, 2);
// Return false because firstCell <= c[0]
Assert.assertFalse(scanner.seekBefore(firstCell));
// The block1 shouldn't be released because we still don't do the shipped or close.
- Assert.assertEquals(2, curBlock.refCnt());
+ this.assertRefCnt(curBlock, 2);
scanner.close();
- Assert.assertEquals(1, curBlock.refCnt());
+ this.assertRefCnt(curBlock, 1);
Assert.assertTrue(defaultBC.evictBlocksByHfileName(hfilePath.getName()) >= 1);
Assert.assertEquals(0, curBlock.refCnt());
}
+ private void assertRefCnt(HFileBlock block, int value) {
+ if (ioengine.startsWith("offheap") || ioengine.startsWith("pmem")) {
+ Assert.assertEquals(value, block.refCnt());
+ } else {
+ Assert.assertEquals(value - 1, block.refCnt());
+ }
+ }
+
@Test
public void testDefault() throws Exception {
testReleaseBlock(Algorithm.NONE, DataBlockEncoding.NONE);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 2184fa5..d1b8f9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
@@ -48,8 +49,8 @@ public class TestByteBufferIOEngine {
private static class MockBucketEntry extends BucketEntry {
private long off;
- MockBucketEntry(long offset, int length) {
- super(offset & 0xFF00, length, 0, false);
+ MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
+ super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
this.off = offset;
}
@@ -66,7 +67,11 @@ public class TestByteBufferIOEngine {
}
static BucketEntry createBucketEntry(long offset, int len) {
- BucketEntry be = new MockBucketEntry(offset, len);
+ return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
+ }
+
+ static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
+ BucketEntry be = new MockBucketEntry(offset, len, allocator);
be.setDeserializerReference(DESERIALIZER);
return be;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 6bd91d0..8f86e83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@@ -30,8 +31,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
@@ -40,6 +44,9 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Basic test for {@link FileIOEngine}
@@ -131,6 +138,31 @@ public class TestFileIOEngine {
}
@Test
+ public void testReadFailedShouldReleaseByteBuff() {
+ ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
+ final RefCnt refCnt = RefCnt.create();
+ Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
+ @Override
+ public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
+ int len = invocation.getArgument(0);
+ return ByteBuff.wrap(new ByteBuffer[]{ByteBuffer.allocate(len + 1)}, refCnt);
+ }
+ });
+ int len = 10;
+ byte[] data1 = new byte[len];
+ assertEquals(1, refCnt.refCnt());
+ try {
+ fileIOEngine.write(ByteBuffer.wrap(data1), 0);
+ BucketEntry be = createBucketEntry(0, len, alloc);
+ fileIOEngine.read(be);
+ fail();
+ } catch (IOException ioe) {
+ // expected exception.
+ }
+ assertEquals(0, refCnt.refCnt());
+ }
+
+ @Test
public void testClosedChannelException() throws IOException {
fileIOEngine.closeFileChannels();
int len = 5;