You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/05/31 07:15:17 UTC
[hbase] 07/17: HBASE-22159 ByteBufferIOEngine should support write
off-heap ByteBuff to the bufferArray
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ce0edf5509acb87033ed4bd89eea9ba3a9f38112
Author: huzheng <op...@gmail.com>
AuthorDate: Wed Apr 3 22:29:31 2019 +0800
HBASE-22159 ByteBufferIOEngine should support write off-heap ByteBuff to the bufferArray
---
.../apache/hadoop/hbase/io/ByteBuffAllocator.java | 29 +-
.../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 33 ++
.../apache/hadoop/hbase/util/ByteBufferArray.java | 398 ++++++++++-----------
.../hadoop/hbase/util/TestByteBufferArray.java | 183 +++++++---
.../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 100 +++---
.../hfile/bucket/ExclusiveMemoryMmapIOEngine.java | 15 +-
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 12 +-
.../hbase/io/hfile/bucket/FileMmapIOEngine.java | 9 +-
.../io/hfile/bucket/TestByteBufferIOEngine.java | 58 +--
.../bucket/TestExclusiveMemoryMmapEngine.java | 31 +-
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 24 +-
11 files changed, 485 insertions(+), 407 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 984d46d..51de22a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -29,7 +29,6 @@ import sun.nio.ch.DirectBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -175,7 +174,7 @@ public class ByteBuffAllocator {
return allocateOnHeap(this.bufSize);
}
- private SingleByteBuff allocateOnHeap(int size) {
+ private static SingleByteBuff allocateOnHeap(int size) {
return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
}
@@ -213,7 +212,7 @@ public class ByteBuffAllocator {
// just allocate the ByteBuffer from on-heap.
bbs.add(ByteBuffer.allocate(remain));
}
- ByteBuff bb = wrap(bbs, () -> {
+ ByteBuff bb = ByteBuff.wrap(bbs, () -> {
for (int i = 0; i < lenFromReservoir; i++) {
this.putbackBuffer(bbs.get(i));
}
@@ -238,30 +237,6 @@ public class ByteBuffAllocator {
}
}
- public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
- if (buffers == null || buffers.length == 0) {
- throw new IllegalArgumentException("buffers shouldn't be null or empty");
- }
- return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
- : new MultiByteBuff(recycler, buffers);
- }
-
- public static ByteBuff wrap(ByteBuffer[] buffers) {
- return wrap(buffers, NONE);
- }
-
- public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
- if (buffers == null || buffers.size() == 0) {
- throw new IllegalArgumentException("buffers shouldn't be null or empty");
- }
- return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
- : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
- }
-
- public static ByteBuff wrap(List<ByteBuffer> buffers) {
- return wrap(buffers, NONE);
- }
-
/**
* @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
* the maximum pool size, it will create a new one and return. In case of max pool size
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 1ee3607..9339f43 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.util.List;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ObjectIntPair;
@@ -557,4 +560,34 @@ public abstract class ByteBuff implements ReferenceCounted {
return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
", cap= " + capacity() + "]";
}
+
+ /********************************* ByteBuff wrapper methods ***********************************/
+
+ public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
+ if (buffers == null || buffers.length == 0) {
+ throw new IllegalArgumentException("buffers shouldn't be null or empty");
+ }
+ return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
+ : new MultiByteBuff(recycler, buffers);
+ }
+
+ public static ByteBuff wrap(ByteBuffer[] buffers) {
+ return wrap(buffers, ByteBuffAllocator.NONE);
+ }
+
+ public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
+ if (buffers == null || buffers.size() == 0) {
+ throw new IllegalArgumentException("buffers shouldn't be null or empty");
+ }
+ return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
+ : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
+ }
+
+ public static ByteBuff wrap(List<ByteBuffer> buffers) {
+ return wrap(buffers, ByteBuffAllocator.NONE);
+ }
+
+ public static ByteBuff wrap(ByteBuffer buffer) {
+ return new SingleByteBuff(ByteBuffAllocator.NONE, buffer);
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index d023339..e5a0b13 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -38,279 +37,248 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
- * This class manages an array of ByteBuffers with a default size 4MB. These
- * buffers are sequential and could be considered as a large buffer.It supports
- * reading/writing data from this large buffer with a position and offset
+ * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
+ * and could be considered as a large buffer.It supports reading/writing data from this large buffer
+ * with a position and offset
*/
@InterfaceAudience.Private
public class ByteBufferArray {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
- @VisibleForTesting
- ByteBuffer buffers[];
- private int bufferSize;
- @VisibleForTesting
- int bufferCount;
+ private final int bufferSize;
+ private final int bufferCount;
+ final ByteBuffer[] buffers;
/**
- * We allocate a number of byte buffers as the capacity. In order not to out
- * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
- * we will allocate one additional buffer with capacity 0;
+ * We allocate a number of byte buffers as the capacity.
* @param capacity total size of the byte buffer array
* @param allocator the ByteBufferAllocator that will create the buffers
* @throws IOException throws IOException if there is an exception thrown by the allocator
*/
- public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
- throws IOException {
- this.bufferSize = DEFAULT_BUFFER_SIZE;
- if (this.bufferSize > (capacity / 16))
- this.bufferSize = (int) roundUp(capacity / 16, 32768);
- this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
- LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
- + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
- + bufferCount);
- buffers = new ByteBuffer[bufferCount + 1];
- createBuffers(allocator);
+ public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
+ this(getBufferSize(capacity), getBufferCount(capacity),
+ Runtime.getRuntime().availableProcessors(), capacity, allocator);
}
@VisibleForTesting
- void createBuffers(ByteBufferAllocator allocator)
- throws IOException {
- int threadCount = getThreadCount();
- ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
- int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
- Future<ByteBuffer[]>[] futures = new Future[threadCount];
+ ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
+ ByteBufferAllocator alloc) throws IOException {
+ this.bufferSize = bufferSize;
+ this.bufferCount = bufferCount;
+ LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
+ StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
+ this.buffers = new ByteBuffer[bufferCount];
+ createBuffers(threadCount, alloc);
+ }
+
+ private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
+ ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+ int perThreadCount = bufferCount / threadCount;
+ int reminder = bufferCount % threadCount;
try {
+ List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
+ // Dispatch the creation task to each thread.
for (int i = 0; i < threadCount; i++) {
- // Last thread will have to deal with a different number of buffers
- int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
- futures[i] = service.submit(
- new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
+ final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
+ futures.add(pool.submit(() -> {
+ ByteBuffer[] chunk = new ByteBuffer[chunkSize];
+ for (int k = 0; k < chunkSize; k++) {
+ chunk[k] = alloc.allocate(bufferSize);
+ }
+ return chunk;
+ }));
}
+ // Append the buffers created by each thread.
int bufferIndex = 0;
- for (Future<ByteBuffer[]> future : futures) {
- try {
- ByteBuffer[] buffers = future.get();
- for (ByteBuffer buffer : buffers) {
- this.buffers[bufferIndex++] = buffer;
+ try {
+ for (Future<ByteBuffer[]> f : futures) {
+ for (ByteBuffer b : f.get()) {
+ this.buffers[bufferIndex++] = b;
}
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Buffer creation interrupted", e);
- throw new IOException(e);
}
+ assert bufferIndex == bufferCount;
+ } catch (Exception e) {
+ LOG.error("Buffer creation interrupted", e);
+ throw new IOException(e);
}
} finally {
- service.shutdownNow();
+ pool.shutdownNow();
}
- // always create on heap empty dummy buffer at last
- this.buffers[bufferCount] = ByteBuffer.allocate(0);
}
@VisibleForTesting
- int getThreadCount() {
- return Runtime.getRuntime().availableProcessors();
- }
-
- /**
- * A callable that creates buffers of the specified length either onheap/offheap using the
- * {@link ByteBufferAllocator}
- */
- private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
- private final int bufferCapacity;
- private final int bufferCount;
- private final ByteBufferAllocator allocator;
-
- BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
- this.bufferCapacity = bufferCapacity;
- this.bufferCount = bufferCount;
- this.allocator = allocator;
+ static int getBufferSize(long capacity) {
+ int bufferSize = DEFAULT_BUFFER_SIZE;
+ if (bufferSize > (capacity / 16)) {
+ bufferSize = (int) roundUp(capacity / 16, 32768);
}
+ return bufferSize;
+ }
- @Override
- public ByteBuffer[] call() throws Exception {
- ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
- for (int i = 0; i < this.bufferCount; i++) {
- buffers[i] = allocator.allocate(this.bufferCapacity);
- }
- return buffers;
- }
+ private static int getBufferCount(long capacity) {
+ int bufferSize = getBufferSize(capacity);
+ return (int) (roundUp(capacity, bufferSize) / bufferSize);
}
- private long roundUp(long n, long to) {
+ private static long roundUp(long n, long to) {
return ((n + to - 1) / to) * to;
}
/**
- * Transfers bytes from this buffer array into the given destination array
- * @param start start position in the ByteBufferArray
- * @param len The maximum number of bytes to be written to the given array
- * @param dstArray The array into which bytes are to be written
+ * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
+ * @param offset start position in this big logical array.
+ * @param dst the destination ByteBuff. Notice that its position will be advanced.
* @return number of bytes read
*/
- public int getMultiple(long start, int len, byte[] dstArray) {
- return getMultiple(start, len, dstArray, 0);
+ public int read(long offset, ByteBuff dst) {
+ return internalTransfer(offset, dst, READER);
}
/**
- * Transfers bytes from this buffer array into the given destination array
- * @param start start offset of this buffer array
- * @param len The maximum number of bytes to be written to the given array
- * @param dstArray The array into which bytes are to be written
- * @param dstOffset The offset within the given array of the first byte to be
- * written
- * @return number of bytes read
+ * Transfers bytes from the given source {@link ByteBuff} into this buffer array
+ * @param offset start offset of this big logical array.
+ * @param src the source ByteBuff. Notice that its position will be advanced.
+ * @return number of bytes write
*/
- public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
- multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
- return len;
+ public int write(long offset, ByteBuff src) {
+ return internalTransfer(offset, src, WRITER);
}
- private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
- @Override
- public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
- ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
- }
+ /**
+ * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
+ * source and destination will be advanced.
+ */
+ private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
+ int off = src.position(), len = dst.remaining();
+ src.get(dst, off, len);
+ src.position(off + len);
};
/**
- * Transfers bytes from the given source array into this buffer array
- * @param start start offset of this buffer array
- * @param len The maximum number of bytes to be read from the given array
- * @param srcArray The array from which bytes are to be read
+ * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
+ * source and destination will be advanced.
*/
- public void putMultiple(long start, int len, byte[] srcArray) {
- putMultiple(start, len, srcArray, 0);
- }
+ private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
+ int off = dst.position(), len = src.remaining(), srcOff = src.position();
+ dst.put(off, ByteBuff.wrap(src), srcOff, len);
+ src.position(srcOff + len);
+ dst.position(off + len);
+ };
/**
- * Transfers bytes from the given source array into this buffer array
- * @param start start offset of this buffer array
- * @param len The maximum number of bytes to be read from the given array
- * @param srcArray The array from which bytes are to be read
- * @param srcOffset The offset within the given array of the first byte to be
- * read
+ * Transferring all remaining bytes from b to the buffers array starting at offset, or
+ * transferring bytes from the buffers array at offset to b until b is filled. Notice that
+ * position of ByteBuff b will be advanced.
+ * @param offset where we start in the big logical array.
+ * @param b the ByteBuff to transfer from or to
+ * @param transfer the transfer interface.
+ * @return the length of bytes we transferred.
*/
- public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
- multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
- }
-
- private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
- @Override
- public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
- ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
+ private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
+ int expectedTransferLen = b.remaining();
+ if (expectedTransferLen == 0) {
+ return 0;
}
- };
-
- private interface Visitor {
- /**
- * Visit the given byte buffer, if it is a read action, we will transfer the
- * bytes from the buffer to the destination array, else if it is a write
- * action, we will transfer the bytes from the source array to the buffer
- * @param bb byte buffer
- * @param pos Start position in ByteBuffer
- * @param array a source or destination byte array
- * @param arrayOffset offset of the byte array
- * @param len read/write length
- */
- void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
+ BufferIterator it = new BufferIterator(offset, expectedTransferLen);
+ while (it.hasNext()) {
+ ByteBuffer a = it.next();
+ transfer.accept(a, b);
+ assert !a.hasRemaining();
+ }
+ assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
+ + ") don't match the actual transfer length(=" + it.getSum() + ")";
+ return expectedTransferLen;
}
/**
- * Access(read or write) this buffer array with a position and length as the
- * given array. Here we will only lock one buffer even if it may be need visit
- * several buffers. The consistency is guaranteed by the caller.
- * @param start start offset of this buffer array
- * @param len The maximum number of bytes to be accessed
- * @param array The array from/to which bytes are to be read/written
- * @param arrayOffset The offset within the given array of the first byte to
- * be read or written
- * @param visitor implement of how to visit the byte buffer
+ * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the length
+ * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
+ * asSubBuffer(5, 10) then we will create an MBB consisting of two BBs and the first one be a BB
+ * from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 'length' 5.
+ * @param offset the position in the whole array which is composited by multiple byte buffers.
+ * @param len the length of bytes
+ * @return a ByteBuff formed from the underlying ByteBuffers
*/
- void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
- assert len >= 0;
- long end = start + len;
- int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
- int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
- assert array.length >= len + arrayOffset;
- assert startBuffer >= 0 && startBuffer < bufferCount;
- assert (endBuffer >= 0 && endBuffer < bufferCount)
- || (endBuffer == bufferCount && endOffset == 0);
- if (startBuffer >= buffers.length || startBuffer < 0) {
- String msg = "Failed multiple, start=" + start + ",startBuffer="
- + startBuffer + ",bufferSize=" + bufferSize;
- LOG.error(msg);
- throw new RuntimeException(msg);
+ public ByteBuff asSubByteBuff(long offset, final int len) {
+ BufferIterator it = new BufferIterator(offset, len);
+ ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
+ for (int i = 0; i < mbb.length; i++) {
+ assert it.hasNext();
+ mbb[i] = it.next();
}
- int srcIndex = 0, cnt = -1;
- for (int i = startBuffer; i <= endBuffer; ++i) {
- ByteBuffer bb = buffers[i].duplicate();
- int pos = 0;
- if (i == startBuffer) {
- cnt = bufferSize - startOffset;
- if (cnt > len) cnt = len;
- pos = startOffset;
- } else if (i == endBuffer) {
- cnt = endOffset;
- } else {
- cnt = bufferSize;
- }
- visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
- srcIndex += cnt;
- }
- assert srcIndex == len;
+ assert it.getSum() == len;
+ return ByteBuff.wrap(mbb);
}
/**
- * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
- * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
- * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
- * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
- * 'position' 0 to 'length' 5.
- * @param offset
- * @param len
- * @return a ByteBuff formed from the underlying ByteBuffers
+ * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
*/
- public ByteBuff asSubByteBuff(long offset, int len) {
- assert len >= 0;
- long end = offset + len;
- int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
- int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
- // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
- if (endBuffer == this.bufferCount) {
- endBuffer--;
- endBufferOffset = bufferSize;
+ private class BufferIterator implements Iterator<ByteBuffer> {
+ private final int len;
+ private int startBuffer, startOffset, endBuffer, endOffset;
+ private int curIndex, sum = 0;
+
+ private int index(long pos) {
+ return (int) (pos / bufferSize);
+ }
+
+ private int offset(long pos) {
+ return (int) (pos % bufferSize);
+ }
+
+ public BufferIterator(long offset, int len) {
+ assert len >= 0 && offset >= 0;
+ this.len = len;
+
+ this.startBuffer = index(offset);
+ this.startOffset = offset(offset);
+
+ this.endBuffer = index(offset + len);
+ this.endOffset = offset(offset + len);
+ if (startBuffer < endBuffer && endOffset == 0) {
+ endBuffer--;
+ endOffset = bufferSize;
+ }
+ assert startBuffer >= 0 && startBuffer < bufferCount;
+ assert endBuffer >= 0 && endBuffer < bufferCount;
+
+ // initialize the index to the first buffer index.
+ this.curIndex = startBuffer;
}
- assert startBuffer >= 0 && startBuffer < bufferCount;
- assert (endBuffer >= 0 && endBuffer < bufferCount)
- || (endBuffer == bufferCount && endBufferOffset == 0);
- if (startBuffer >= buffers.length || startBuffer < 0) {
- String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
- + ",bufferSize=" + bufferSize;
- LOG.error(msg);
- throw new RuntimeException(msg);
+
+ @Override
+ public boolean hasNext() {
+ return this.curIndex <= endBuffer;
}
- int srcIndex = 0, cnt = -1;
- ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
- for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
- ByteBuffer bb = buffers[i].duplicate();
- if (i == startBuffer) {
- cnt = bufferSize - startBufferOffset;
- if (cnt > len) cnt = len;
- bb.limit(startBufferOffset + cnt).position(startBufferOffset);
- } else if (i == endBuffer) {
- cnt = endBufferOffset;
- bb.position(0).limit(cnt);
+
+ /**
+ * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
+ * original one.
+ */
+ @Override
+ public ByteBuffer next() {
+ ByteBuffer bb = buffers[curIndex].duplicate();
+ if (curIndex == startBuffer) {
+ bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
+ } else if (curIndex == endBuffer) {
+ bb.position(0).limit(endOffset);
} else {
- cnt = bufferSize;
- bb.position(0).limit(cnt);
+ bb.position(0).limit(bufferSize);
}
- mbb[j] = bb.slice();
- srcIndex += cnt;
+ curIndex++;
+ sum += bb.remaining();
+ // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
+ // ByteBuffers.
+ return bb.slice();
+ }
+
+ int getSum() {
+ return sum;
+ }
+
+ int getBufferCount() {
+ return this.endBuffer - this.startBuffer + 1;
}
- assert srcIndex == len;
- return ByteBuffAllocator.wrap(mbb);
}
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
index 3fc1c23..0534924 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
@@ -20,34 +20,37 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Random;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category({MiscTests.class, SmallTests.class})
+@Category({ MiscTests.class, SmallTests.class })
public class TestByteBufferArray {
+ private static final Random RANDOM = new Random(System.currentTimeMillis());
+
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestByteBufferArray.class);
+ private static final ByteBufferAllocator ALLOC = (size) -> ByteBuffer.allocateDirect((int) size);
+
@Test
public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
int capacity = 4 * 1024 * 1024;
- ByteBufferAllocator allocator = new ByteBufferAllocator() {
- @Override
- public ByteBuffer allocate(long size) throws IOException {
- return ByteBuffer.allocateDirect((int) size);
- }
- };
- ByteBufferArray array = new ByteBufferArray(capacity, allocator);
+ ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
subBuf.position(capacity - 1);// Position to the last byte
assertTrue(subBuf.hasRemaining());
@@ -59,54 +62,148 @@ public class TestByteBufferArray {
@Test
public void testByteBufferCreation() throws Exception {
int capacity = 470 * 1021 * 1023;
- ByteBufferAllocator allocator = new ByteBufferAllocator() {
- @Override
- public ByteBuffer allocate(long size) throws IOException {
- return ByteBuffer.allocateDirect((int) size);
- }
- };
- ByteBufferArray array = new ByteBufferArray(capacity, allocator);
- assertEquals(119, array.buffers.length);
+ ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
+ assertEquals(118, array.buffers.length);
for (int i = 0; i < array.buffers.length; i++) {
- if (i == array.buffers.length - 1) {
- assertEquals(0, array.buffers[i].capacity());
- } else {
- assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
- }
+ assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
}
}
@Test
public void testByteBufferCreation1() throws Exception {
- ByteBufferAllocator allocator = new ByteBufferAllocator() {
- @Override
- public ByteBuffer allocate(long size) throws IOException {
- return ByteBuffer.allocateDirect((int) size);
- }
- };
- ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, allocator);
- // overwrite
- array.bufferCount = 25;
- array.buffers = new ByteBuffer[array.bufferCount + 1];
- array.createBuffers(allocator);
+ long cap = 7 * 1024L * 1024L;
+ int bufferSize = ByteBufferArray.getBufferSize(cap), bufferCount = 25;
+ ByteBufferArray array = new ByteBufferArray(bufferSize, bufferCount, 16, cap, ALLOC);
for (int i = 0; i < array.buffers.length; i++) {
- if (i == array.buffers.length - 1) {
- assertEquals(0, array.buffers[i].capacity());
- } else {
- assertEquals(458752, array.buffers[i].capacity());
- }
+ assertEquals(458752, array.buffers[i].capacity());
+ }
+ }
+
+ private static void fill(ByteBuff buf, byte val) {
+ for (int i = buf.position(); i < buf.limit(); i++) {
+ buf.put(i, val);
+ }
+ }
+
+ private ByteBuff createByteBuff(int len) {
+ assert len >= 0;
+ int pos = len == 0 ? 0 : RANDOM.nextInt(len);
+ ByteBuff b = ByteBuff.wrap(ByteBuffer.allocate(2 * len));
+ b.position(pos).limit(pos + len);
+ return b;
+ }
+
+ private interface Call {
+ void run() throws IOException;
+ }
+
+ private void expectedAssert(Call r) throws IOException {
+ try {
+ r.run();
+ fail();
+ } catch (AssertionError e) {
+ // Ignore
+ }
+ }
+
+
+ @Test
+ public void testArrayIO() throws IOException {
+ int cap = 9 * 1024 * 1024, bufferSize = ByteBufferArray.getBufferSize(cap);
+ ByteBufferArray array = new ByteBufferArray(cap, ALLOC);
+ testReadAndWrite(array, 0, 512, (byte) 2);
+ testReadAndWrite(array, cap - 512, 512, (byte) 3);
+ testReadAndWrite(array, 4 * 1024 * 1024, 5 * 1024 * 1024, (byte) 4);
+ testReadAndWrite(array, 256, 256, (byte) 5);
+ testReadAndWrite(array, 257, 513, (byte) 6);
+ testReadAndWrite(array, 0, cap, (byte) 7);
+ testReadAndWrite(array, cap, 0, (byte) 8);
+ testReadAndWrite(array, cap - 1, 1, (byte) 9);
+ testReadAndWrite(array, cap - 2, 2, (byte) 10);
+
+ expectedAssert(() -> testReadAndWrite(array, cap - 2, 3, (byte) 11));
+ expectedAssert(() -> testReadAndWrite(array, cap + 1, 0, (byte) 12));
+ expectedAssert(() -> testReadAndWrite(array, 0, cap + 1, (byte) 12));
+ expectedAssert(() -> testReadAndWrite(array, -1, 0, (byte) 13));
+ expectedAssert(() -> testReadAndWrite(array, 0, -23, (byte) 14));
+ expectedAssert(() -> testReadAndWrite(array, 0, 0, (byte) 15));
+ expectedAssert(() -> testReadAndWrite(array, 4096, cap - 4096 + 1, (byte) 16));
+
+ testAsSubByteBuff(array, 0, cap, true);
+ testAsSubByteBuff(array, 0, 0, false);
+ testAsSubByteBuff(array, 0, 1, false);
+ testAsSubByteBuff(array, 0, bufferSize - 1, false);
+ testAsSubByteBuff(array, 0, bufferSize, false);
+ testAsSubByteBuff(array, 0, bufferSize + 1, true);
+ testAsSubByteBuff(array, 0, 2 * bufferSize, true);
+ testAsSubByteBuff(array, 0, 5 * bufferSize, true);
+ testAsSubByteBuff(array, cap - bufferSize - 1, bufferSize, true);
+ testAsSubByteBuff(array, cap - bufferSize, bufferSize, false);
+ testAsSubByteBuff(array, cap - bufferSize, 0, false);
+ testAsSubByteBuff(array, cap - bufferSize, 1, false);
+ testAsSubByteBuff(array, cap - bufferSize, bufferSize - 1, false);
+ testAsSubByteBuff(array, cap - 2 * bufferSize, 2 * bufferSize, true);
+ testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize + 1, true);
+ testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize - 1, false);
+ testAsSubByteBuff(array, cap - 2 * bufferSize, 0, false);
+
+ expectedAssert(() -> testAsSubByteBuff(array, 0, cap + 1, false));
+ expectedAssert(() -> testAsSubByteBuff(array, 0, -1, false));
+ expectedAssert(() -> testAsSubByteBuff(array, -1, -1, false));
+ expectedAssert(() -> testAsSubByteBuff(array, cap - bufferSize, bufferSize + 1, false));
+ expectedAssert(() -> testAsSubByteBuff(array, 2 * bufferSize, cap - 2 * bufferSize + 1, false));
+ }
+
+ private void testReadAndWrite(ByteBufferArray array, int off, int dataSize, byte val) {
+ ByteBuff src = createByteBuff(dataSize);
+ int pos = src.position(), lim = src.limit();
+ fill(src, val);
+ assertEquals(src.remaining(), dataSize);
+ try {
+ assertEquals(dataSize, array.write(off, src));
+ assertEquals(0, src.remaining());
+ } finally {
+ src.position(pos).limit(lim);
+ }
+
+ ByteBuff dst = createByteBuff(dataSize);
+ pos = dst.position();
+ lim = dst.limit();
+ try {
+ assertEquals(dataSize, array.read(off, dst));
+ assertEquals(0, dst.remaining());
+ } finally {
+ dst.position(pos).limit(lim);
}
+ assertByteBuffEquals(src, dst);
}
- private static class DummyByteBufferArray extends ByteBufferArray {
+ private void testAsSubByteBuff(ByteBufferArray array, int off, int len, boolean isMulti) {
+ ByteBuff ret = array.asSubByteBuff(off, len);
+ if (isMulti) {
+ assertTrue(ret instanceof MultiByteBuff);
+ } else {
+ assertTrue(ret instanceof SingleByteBuff);
+ }
+ assertTrue(!ret.hasArray());
+ assertEquals(len, ret.remaining());
- public DummyByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
- super(capacity, allocator);
+ ByteBuff tmp = createByteBuff(len);
+ int pos = tmp.position(), lim = tmp.limit();
+ try {
+ assertEquals(len, array.read(off, tmp));
+ assertEquals(0, tmp.remaining());
+ } finally {
+ tmp.position(pos).limit(lim);
}
- @Override
- int getThreadCount() {
- return 16;
+ assertByteBuffEquals(ret, tmp);
+ }
+
+ private void assertByteBuffEquals(ByteBuff a, ByteBuff b) {
+ assertEquals(a.remaining(), b.remaining());
+ for (int i = a.position(), j = b.position(); i < a.limit(); i++, j++) {
+ assertEquals(a.get(i), b.get(j));
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 3b832fe..fa8b184 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -30,39 +30,37 @@ import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
- * IO engine that stores data in memory using an array of ByteBuffers
- * {@link ByteBufferArray}.
- *
- *<h2>How it Works</h2>
- * First, see {@link ByteBufferArray} and how it gives a view across multiple ByteBuffers managed
- * by it internally. This class does the physical BB create and the write and read to the
- * underlying BBs. So we will create N BBs based on the total BC capacity specified on create
- * of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560 such
- * BBs inside our ByteBufferArray.
- *
- * <p>Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
- * default from 5 KB to 513 KB. Within each bucket of a particular size, there are
- * usually more than one bucket 'block'. The way it is calculate in bucketcache is that the total
- * bucketcache size is divided by 4 (hard-coded currently) * max size option. So using defaults,
- * buckets will be is 4 * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset
- * zero will serve out bucket 'blocks' of 5kb, the next bucket will do the next size up and so on
- * up to the maximum (default) of 513kb).
- *
- * <p>When we write blocks to the bucketcache, we will see which bucket size group it best fits.
- * So a 4 KB block size goes to the 5 KB size group. Each of the block writes, writes within its
- * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the
- * 5 KB bucket 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span
- * buckets.
- *
- * <p>But you can see the physical memory under the bucket 'blocks' can be split across the
- * underlying backing BBs from ByteBufferArray. All is split into 4 MB sized BBs.
- *
- * <p>Each Bucket knows its offset in the entire space of BC and when block is written the offset
+ * IO engine that stores data in memory using an array of ByteBuffers {@link ByteBufferArray}.
+ * <p>
+ * <h2>How it Works</h2> First, see {@link ByteBufferArray} and how it gives a view across multiple
+ * ByteBuffers managed by it internally. This class does the physical BB create and the write and
+ * read to the underlying BBs. So we will create N BBs based on the total BC capacity specified on
+ * create of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560
+ * such BBs inside our ByteBufferArray. <br>
+ * <p>
+ * Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
+ * default from 5 KB to 513 KB. Within each bucket of a particular size, there are usually more than
+ * one bucket 'block'. The way it is calculate in bucketcache is that the total bucketcache size is
+ * divided by 4 (hard-coded currently) * max size option. So using defaults, buckets will be is 4 *
+ * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset zero will serve out
+ * bucket 'blocks' of 5kb, the next bucket will do the next size up and so on up to the maximum
+ * (default) of 513kb). <br>
+ * <p>
+ * When we write blocks to the bucketcache, we will see which bucket size group it best fits. So a 4
+ * KB block size goes to the 5 KB size group. Each of the block writes, writes within its
+ * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the 5 KB bucket
+ * 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span buckets. <br>
+ * <p>
+ * But you can see the physical memory under the bucket 'blocks' can be split across the underlying
+ * backing BBs from ByteBufferArray. All is split into 4 MB sized BBs. <br>
+ * <p>
+ * Each Bucket knows its offset in the entire space of BC and when block is written the offset
* arrives at ByteBufferArray and it figures which BB to write to. It may so happen that the entire
* block to be written does not fit a particular backing ByteBufferArray so the remainder goes to
- * another BB. See {@link ByteBufferArray#putMultiple(long, int, byte[])}.
-
-So said all these, when we read a block it may be possible that the bytes of that blocks is physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the MBB...
+ * another BB. See {@link ByteBufferArray#write(long, ByteBuff)}. <br>
+ * So said all these, when we read a block it may be possible that the bytes of that blocks is
+ * physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the
+ * MBB...
*/
@InterfaceAudience.Private
public class ByteBufferIOEngine implements IOEngine {
@@ -74,15 +72,9 @@ public class ByteBufferIOEngine implements IOEngine {
* @param capacity
* @throws IOException ideally here no exception to be thrown from the allocator
*/
- public ByteBufferIOEngine(long capacity)
- throws IOException {
+ public ByteBufferIOEngine(long capacity) throws IOException {
this.capacity = capacity;
- ByteBufferAllocator allocator = new ByteBufferAllocator() {
- @Override
- public ByteBuffer allocate(long size) throws IOException {
- return ByteBuffer.allocateDirect((int) size);
- }
- };
+ ByteBufferAllocator allocator = (size) -> ByteBuffer.allocateDirect((int) size);
bufferArray = new ByteBufferArray(capacity, allocator);
}
@@ -121,27 +113,29 @@ public class ByteBufferIOEngine implements IOEngine {
}
/**
- * Transfers data from the given byte buffer to the buffer array
- * @param srcBuffer the given byte buffer from which bytes are to be read
- * @param offset The offset in the ByteBufferArray of the first byte to be
- * written
+ * Transfers data from the given {@link ByteBuffer} to the buffer array. Position of source will
+ * be advanced by the {@link ByteBuffer#remaining()}.
+ * @param src the given byte buffer from which bytes are to be read.
+ * @param offset The offset in the ByteBufferArray of the first byte to be written
* @throws IOException throws IOException if writing to the array throws exception
*/
@Override
- public void write(ByteBuffer srcBuffer, long offset) throws IOException {
- assert srcBuffer.hasArray();
- bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
- srcBuffer.arrayOffset());
+ public void write(ByteBuffer src, long offset) throws IOException {
+ bufferArray.write(offset, ByteBuff.wrap(src));
}
+ /**
+ * Transfers data from the given {@link ByteBuff} to the buffer array. Position of source will be
+ * advanced by the {@link ByteBuffer#remaining()}.
+ * @param src the given byte buffer from which bytes are to be read.
+ * @param offset The offset in the ByteBufferArray of the first byte to be written
+ * @throws IOException throws IOException if writing to the array throws exception
+ */
@Override
- public void write(ByteBuff srcBuffer, long offset) throws IOException {
- // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
- // This will work for now. But from the DFS itself if we get DBB then this may not hold true.
- assert srcBuffer.hasArray();
- bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
- srcBuffer.arrayOffset());
+ public void write(ByteBuff src, long offset) throws IOException {
+ bufferArray.write(offset, src);
}
+
/**
* No operation for the sync in the memory IO engine
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
index 8b024f0..b8e29c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
@@ -16,16 +16,15 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* IO engine that stores data to a file on the local block device using memory mapping
@@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
- static final Logger LOG = LoggerFactory.getLogger(ExclusiveMemoryMmapIOEngine.class);
public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOException {
super(filePath, capacity);
@@ -42,9 +40,8 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
@Override
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
- byte[] dst = new byte[length];
- bufferArray.getMultiple(offset, length, dst);
- return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
- MemoryType.EXCLUSIVE);
+ ByteBuff dst = HEAP.allocate(length);
+ bufferArray.read(offset, dst);
+ return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 0710d26..f6e49cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -143,6 +143,7 @@ public class FileIOEngine implements IOEngine {
+ " expected");
}
}
+ dstBuffer.rewind();
return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
}
@@ -210,10 +211,8 @@ public class FileIOEngine implements IOEngine {
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
- // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
- assert srcBuffer.hasArray();
- write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
- srcBuffer.remaining()), offset);
+ ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
+ write(dup, offset);
}
private void accessFile(FileAccessor accessor, ByteBuffer buffer,
@@ -229,8 +228,7 @@ public class FileIOEngine implements IOEngine {
int accessLen = 0;
if (endFileNum > accessFileNum) {
// short the limit;
- buffer.limit((int) (buffer.limit() - remainingAccessDataLen
- + sizePerFile - accessOffset));
+ buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
}
try {
accessLen = accessor.access(fileChannel, buffer, accessOffset);
@@ -307,7 +305,7 @@ public class FileIOEngine implements IOEngine {
}
}
- private static interface FileAccessor {
+ private interface FileAccessor {
int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
index 9580efe..bd17fd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
@@ -112,17 +112,12 @@ public abstract class FileMmapIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
- assert srcBuffer.hasArray();
- bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
- srcBuffer.arrayOffset());
+ bufferArray.write(offset, ByteBuff.wrap(srcBuffer));
}
@Override
public void write(ByteBuff srcBuffer, long offset) throws IOException {
- // This singleByteBuff can be considered to be array backed
- assert srcBuffer.hasArray();
- bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
- srcBuffer.arrayOffset());
+ bufferArray.write(offset, srcBuffer);
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index bb58b4e..a06d86d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -56,12 +55,10 @@ public class TestByteBufferIOEngine {
if (blockSize == 0) {
blockSize = 1;
}
- byte[] byteArray = new byte[blockSize];
- for (int j = 0; j < byteArray.length; ++j) {
- byteArray[j] = val;
- }
- ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
- int offset = 0;
+
+ ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
+ int pos = src.position(), lim = src.limit();
+ int offset;
if (testOffsetAtStartNum > 0) {
testOffsetAtStartNum--;
offset = 0;
@@ -71,13 +68,16 @@ public class TestByteBufferIOEngine {
} else {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
- ioEngine.write(srcBuffer, offset);
+ ioEngine.write(src, offset);
+ src.position(pos).limit(lim);
+
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
- ByteBuff dstBuffer = deserializer.buf;
- for (int j = 0; j < byteArray.length; ++j) {
- assertTrue(byteArray[j] == dstBuffer.get(j));
- }
+ ByteBuff dst = deserializer.buf;
+ Assert.assertEquals(src.remaining(), blockSize);
+ Assert.assertEquals(dst.remaining(), blockSize);
+ Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
+ dst.position(), dst.remaining()));
}
assert testOffsetAtStartNum == 0;
assert testOffsetAtEndNum == 0;
@@ -112,6 +112,16 @@ public class TestByteBufferIOEngine {
}
}
+ static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
+ ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
+ int pos = (int) (Math.random() * len);
+ b.position(pos).limit(pos + len);
+ for (int i = pos; i < pos + len; i++) {
+ b.put(i, (byte) val);
+ }
+ return ByteBuff.wrap(b);
+ }
+
@Test
public void testByteBufferIOEngineWithMBB() throws Exception {
int capacity = 32 * 1024 * 1024; // 32 MB
@@ -126,12 +136,9 @@ public class TestByteBufferIOEngine {
if (blockSize == 0) {
blockSize = 1;
}
- byte[] byteArray = new byte[blockSize];
- for (int j = 0; j < byteArray.length; ++j) {
- byteArray[j] = val;
- }
- ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
- int offset = 0;
+ ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
+ int pos = src.position(), lim = src.limit();
+ int offset;
if (testOffsetAtStartNum > 0) {
testOffsetAtStartNum--;
offset = 0;
@@ -141,13 +148,16 @@ public class TestByteBufferIOEngine {
} else {
offset = (int) (Math.random() * (capacity - maxBlockSize));
}
- ioEngine.write(srcBuffer, offset);
+ ioEngine.write(src, offset);
+ src.position(pos).limit(lim);
+
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
ioEngine.read(offset, blockSize, deserializer);
- ByteBuff dstBuffer = deserializer.buf;
- for (int j = 0; j < byteArray.length; ++j) {
- assertTrue(srcBuffer.get(j) == dstBuffer.get(j));
- }
+ ByteBuff dst = deserializer.buf;
+ Assert.assertEquals(src.remaining(), blockSize);
+ Assert.assertEquals(dst.remaining(), blockSize);
+ Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
+ dst.position(), dst.remaining()));
}
assert testOffsetAtStartNum == 0;
assert testOffsetAtEndNum == 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
index d0d8c8a..79d58f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
@@ -17,16 +17,14 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -34,7 +32,7 @@ import org.junit.experimental.categories.Category;
/**
* Basic test for {@link ExclusiveMemoryMmapIOEngine}
*/
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
public class TestExclusiveMemoryMmapEngine {
@ClassRule
@@ -50,17 +48,23 @@ public class TestExclusiveMemoryMmapEngine {
for (int i = 0; i < 50; i++) {
int len = (int) Math.floor(Math.random() * 100);
long offset = (long) Math.floor(Math.random() * size % (size - len));
- byte[] data1 = new byte[len];
- for (int j = 0; j < data1.length; ++j) {
- data1[j] = (byte) (Math.random() * 255);
- }
- fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
+ int val = (int) (Math.random() * 255);
+
+ // write
+ ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
+ int pos = src.position(), lim = src.limit();
+ fileMmapEngine.write(src, offset);
+ src.position(pos).limit(lim);
+
+ // read
BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
fileMmapEngine.read(offset, len, deserializer);
- ByteBuff data2 = deserializer.getDeserializedByteBuff();
- for (int j = 0; j < data1.length; ++j) {
- assertTrue(data1[j] == data2.get(j));
- }
+ ByteBuff dst = deserializer.getDeserializedByteBuff();
+
+ Assert.assertEquals(src.remaining(), len);
+ Assert.assertEquals(dst.remaining(), len);
+ Assert.assertEquals(0,
+ ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
}
} finally {
File file = new File(filePath);
@@ -68,6 +72,5 @@ public class TestExclusiveMemoryMmapEngine {
file.delete();
}
}
-
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index efb8145..6b0d603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -132,15 +133,22 @@ public class TestFileIOEngine {
fileIOEngine.closeFileChannels();
int len = 5;
long offset = 0L;
- byte[] data1 = new byte[len];
- for (int j = 0; j < data1.length; ++j) {
- data1[j] = (byte) (Math.random() * 255);
+ int val = (int) (Math.random() * 255);
+ for (int i = 0; i < 2; i++) {
+ ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
+ int pos = src.position(), lim = src.limit();
+ fileIOEngine.write(src, offset);
+ src.position(pos).limit(lim);
+
+ BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
+ fileIOEngine.read(offset, len, deserializer);
+ ByteBuff dst = deserializer.getDeserializedByteBuff();
+
+ Assert.assertEquals(src.remaining(), len);
+ Assert.assertEquals(dst.remaining(), len);
+ Assert.assertEquals(0,
+ ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
}
- fileIOEngine.write(ByteBuffer.wrap(data1), offset);
- BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
- fileIOEngine.read(offset, len, deserializer);
- ByteBuff data2 = deserializer.getDeserializedByteBuff();
- assertArrayEquals(data1, data2.array());
}
@Test