You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/05/31 07:15:17 UTC

[hbase] 07/17: HBASE-22159 ByteBufferIOEngine should support write off-heap ByteBuff to the bufferArray

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit ce0edf5509acb87033ed4bd89eea9ba3a9f38112
Author: huzheng <op...@gmail.com>
AuthorDate: Wed Apr 3 22:29:31 2019 +0800

    HBASE-22159 ByteBufferIOEngine should support write off-heap ByteBuff to the bufferArray
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |  29 +-
 .../java/org/apache/hadoop/hbase/nio/ByteBuff.java |  33 ++
 .../apache/hadoop/hbase/util/ByteBufferArray.java  | 398 ++++++++++-----------
 .../hadoop/hbase/util/TestByteBufferArray.java     | 183 +++++++---
 .../hbase/io/hfile/bucket/ByteBufferIOEngine.java  | 100 +++---
 .../hfile/bucket/ExclusiveMemoryMmapIOEngine.java  |  15 +-
 .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java |  12 +-
 .../hbase/io/hfile/bucket/FileMmapIOEngine.java    |   9 +-
 .../io/hfile/bucket/TestByteBufferIOEngine.java    |  58 +--
 .../bucket/TestExclusiveMemoryMmapEngine.java      |  31 +-
 .../hbase/io/hfile/bucket/TestFileIOEngine.java    |  24 +-
 11 files changed, 485 insertions(+), 407 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 984d46d..51de22a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -29,7 +29,6 @@ import sun.nio.ch.DirectBuffer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -175,7 +174,7 @@ public class ByteBuffAllocator {
     return allocateOnHeap(this.bufSize);
   }
 
-  private SingleByteBuff allocateOnHeap(int size) {
+  private static SingleByteBuff allocateOnHeap(int size) {
     return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
   }
 
@@ -213,7 +212,7 @@ public class ByteBuffAllocator {
       // just allocate the ByteBuffer from on-heap.
       bbs.add(ByteBuffer.allocate(remain));
     }
-    ByteBuff bb = wrap(bbs, () -> {
+    ByteBuff bb = ByteBuff.wrap(bbs, () -> {
       for (int i = 0; i < lenFromReservoir; i++) {
         this.putbackBuffer(bbs.get(i));
       }
@@ -238,30 +237,6 @@ public class ByteBuffAllocator {
     }
   }
 
-  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
-    if (buffers == null || buffers.length == 0) {
-      throw new IllegalArgumentException("buffers shouldn't be null or empty");
-    }
-    return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
-        : new MultiByteBuff(recycler, buffers);
-  }
-
-  public static ByteBuff wrap(ByteBuffer[] buffers) {
-    return wrap(buffers, NONE);
-  }
-
-  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
-    if (buffers == null || buffers.size() == 0) {
-      throw new IllegalArgumentException("buffers shouldn't be null or empty");
-    }
-    return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
-        : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
-  }
-
-  public static ByteBuff wrap(List<ByteBuffer> buffers) {
-    return wrap(buffers, NONE);
-  }
-
   /**
    * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached
    *         the maximum pool size, it will create a new one and return. In case of max pool size
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 1ee3607..9339f43 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -20,7 +20,10 @@ package org.apache.hadoop.hbase.nio;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.List;
 
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ObjectIntPair;
@@ -557,4 +560,34 @@ public abstract class ByteBuff implements ReferenceCounted {
     return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() +
         ", cap= " + capacity() + "]";
   }
+
+  /********************************* ByteBuff wrapper methods ***********************************/
+
+  public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
+    if (buffers == null || buffers.length == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0])
+        : new MultiByteBuff(recycler, buffers);
+  }
+
+  public static ByteBuff wrap(ByteBuffer[] buffers) {
+    return wrap(buffers, ByteBuffAllocator.NONE);
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) {
+    if (buffers == null || buffers.size() == 0) {
+      throw new IllegalArgumentException("buffers shouldn't be null or empty");
+    }
+    return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0))
+        : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0]));
+  }
+
+  public static ByteBuff wrap(List<ByteBuffer> buffers) {
+    return wrap(buffers, ByteBuffAllocator.NONE);
+  }
+
+  public static ByteBuff wrap(ByteBuffer buffer) {
+    return new SingleByteBuff(ByteBuffAllocator.NONE, buffer);
+  }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index d023339..e5a0b13 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -20,15 +20,14 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -38,279 +37,248 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
- * This class manages an array of ByteBuffers with a default size 4MB. These
- * buffers are sequential and could be considered as a large buffer.It supports
- * reading/writing data from this large buffer with a position and offset
+ * This class manages an array of ByteBuffers with a default size 4MB. These buffers are sequential
+ * and could be considered as a large buffer.It supports reading/writing data from this large buffer
+ * with a position and offset
  */
 @InterfaceAudience.Private
 public class ByteBufferArray {
   private static final Logger LOG = LoggerFactory.getLogger(ByteBufferArray.class);
 
   public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
-  @VisibleForTesting
-  ByteBuffer buffers[];
-  private int bufferSize;
-  @VisibleForTesting
-  int bufferCount;
+  private final int bufferSize;
+  private final int bufferCount;
+  final ByteBuffer[] buffers;
 
   /**
-   * We allocate a number of byte buffers as the capacity. In order not to out
-   * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
-   * we will allocate one additional buffer with capacity 0;
+   * We allocate a number of byte buffers as the capacity.
    * @param capacity total size of the byte buffer array
    * @param allocator the ByteBufferAllocator that will create the buffers
    * @throws IOException throws IOException if there is an exception thrown by the allocator
    */
-  public ByteBufferArray(long capacity, ByteBufferAllocator allocator)
-      throws IOException {
-    this.bufferSize = DEFAULT_BUFFER_SIZE;
-    if (this.bufferSize > (capacity / 16))
-      this.bufferSize = (int) roundUp(capacity / 16, 32768);
-    this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
-    LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
-        + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
-        + bufferCount);
-    buffers = new ByteBuffer[bufferCount + 1];
-    createBuffers(allocator);
+  public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
+    this(getBufferSize(capacity), getBufferCount(capacity),
+        Runtime.getRuntime().availableProcessors(), capacity, allocator);
   }
 
   @VisibleForTesting
-  void createBuffers(ByteBufferAllocator allocator)
-      throws IOException {
-    int threadCount = getThreadCount();
-    ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
-        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-    int perThreadCount = (int)Math.floor((double) (bufferCount) / threadCount);
-    int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
-    Future<ByteBuffer[]>[] futures = new Future[threadCount];
+  ByteBufferArray(int bufferSize, int bufferCount, int threadCount, long capacity,
+      ByteBufferAllocator alloc) throws IOException {
+    this.bufferSize = bufferSize;
+    this.bufferCount = bufferCount;
+    LOG.info("Allocating buffers total={}, sizePerBuffer={}, count={}",
+      StringUtils.byteDesc(capacity), StringUtils.byteDesc(bufferSize), bufferCount);
+    this.buffers = new ByteBuffer[bufferCount];
+    createBuffers(threadCount, alloc);
+  }
+
+  private void createBuffers(int threadCount, ByteBufferAllocator alloc) throws IOException {
+    ExecutorService pool = Executors.newFixedThreadPool(threadCount);
+    int perThreadCount = bufferCount / threadCount;
+    int reminder = bufferCount % threadCount;
     try {
+      List<Future<ByteBuffer[]>> futures = new ArrayList<>(threadCount);
+      // Dispatch the creation task to each thread.
       for (int i = 0; i < threadCount; i++) {
-        // Last thread will have to deal with a different number of buffers
-        int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
-        futures[i] = service.submit(
-          new BufferCreatorCallable(bufferSize, buffersToCreate, allocator));
+        final int chunkSize = perThreadCount + ((i == threadCount - 1) ? reminder : 0);
+        futures.add(pool.submit(() -> {
+          ByteBuffer[] chunk = new ByteBuffer[chunkSize];
+          for (int k = 0; k < chunkSize; k++) {
+            chunk[k] = alloc.allocate(bufferSize);
+          }
+          return chunk;
+        }));
       }
+      // Append the buffers created by each thread.
       int bufferIndex = 0;
-      for (Future<ByteBuffer[]> future : futures) {
-        try {
-          ByteBuffer[] buffers = future.get();
-          for (ByteBuffer buffer : buffers) {
-            this.buffers[bufferIndex++] = buffer;
+      try {
+        for (Future<ByteBuffer[]> f : futures) {
+          for (ByteBuffer b : f.get()) {
+            this.buffers[bufferIndex++] = b;
           }
-        } catch (InterruptedException | ExecutionException e) {
-          LOG.error("Buffer creation interrupted", e);
-          throw new IOException(e);
         }
+        assert bufferIndex == bufferCount;
+      } catch (Exception e) {
+        LOG.error("Buffer creation interrupted", e);
+        throw new IOException(e);
       }
     } finally {
-      service.shutdownNow();
+      pool.shutdownNow();
     }
-    // always create on heap empty dummy buffer at last
-    this.buffers[bufferCount] = ByteBuffer.allocate(0);
   }
 
   @VisibleForTesting
-  int getThreadCount() {
-    return Runtime.getRuntime().availableProcessors();
-  }
-
-  /**
-   * A callable that creates buffers of the specified length either onheap/offheap using the
-   * {@link ByteBufferAllocator}
-   */
-  private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
-    private final int bufferCapacity;
-    private final int bufferCount;
-    private final ByteBufferAllocator allocator;
-
-    BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) {
-      this.bufferCapacity = bufferCapacity;
-      this.bufferCount = bufferCount;
-      this.allocator = allocator;
+  static int getBufferSize(long capacity) {
+    int bufferSize = DEFAULT_BUFFER_SIZE;
+    if (bufferSize > (capacity / 16)) {
+      bufferSize = (int) roundUp(capacity / 16, 32768);
     }
+    return bufferSize;
+  }
 
-    @Override
-    public ByteBuffer[] call() throws Exception {
-      ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
-      for (int i = 0; i < this.bufferCount; i++) {
-        buffers[i] = allocator.allocate(this.bufferCapacity);
-      }
-      return buffers;
-    }
+  private static int getBufferCount(long capacity) {
+    int bufferSize = getBufferSize(capacity);
+    return (int) (roundUp(capacity, bufferSize) / bufferSize);
   }
 
-  private long roundUp(long n, long to) {
+  private static long roundUp(long n, long to) {
     return ((n + to - 1) / to) * to;
   }
 
   /**
-   * Transfers bytes from this buffer array into the given destination array
-   * @param start start position in the ByteBufferArray
-   * @param len The maximum number of bytes to be written to the given array
-   * @param dstArray The array into which bytes are to be written
+   * Transfers bytes from this buffers array into the given destination {@link ByteBuff}
+   * @param offset start position in this big logical array.
+   * @param dst the destination ByteBuff. Notice that its position will be advanced.
    * @return number of bytes read
    */
-  public int getMultiple(long start, int len, byte[] dstArray) {
-    return getMultiple(start, len, dstArray, 0);
+  public int read(long offset, ByteBuff dst) {
+    return internalTransfer(offset, dst, READER);
   }
 
   /**
-   * Transfers bytes from this buffer array into the given destination array
-   * @param start start offset of this buffer array
-   * @param len The maximum number of bytes to be written to the given array
-   * @param dstArray The array into which bytes are to be written
-   * @param dstOffset The offset within the given array of the first byte to be
-   *          written
-   * @return number of bytes read
+   * Transfers bytes from the given source {@link ByteBuff} into this buffer array
+   * @param offset start offset of this big logical array.
+   * @param src the source ByteBuff. Notice that its position will be advanced.
+   * @return number of bytes write
    */
-  public int getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
-    multiple(start, len, dstArray, dstOffset, GET_MULTIPLE_VISTOR);
-    return len;
+  public int write(long offset, ByteBuff src) {
+    return internalTransfer(offset, src, WRITER);
   }
 
-  private final static Visitor GET_MULTIPLE_VISTOR = new Visitor() {
-    @Override
-    public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
-      ByteBufferUtils.copyFromBufferToArray(array, bb, pos, arrayIdx, len);
-    }
+  /**
+   * Transfer bytes from source {@link ByteBuff} to destination {@link ByteBuffer}. Position of both
+   * source and destination will be advanced.
+   */
+  private static final BiConsumer<ByteBuffer, ByteBuff> WRITER = (dst, src) -> {
+    int off = src.position(), len = dst.remaining();
+    src.get(dst, off, len);
+    src.position(off + len);
   };
 
   /**
-   * Transfers bytes from the given source array into this buffer array
-   * @param start start offset of this buffer array
-   * @param len The maximum number of bytes to be read from the given array
-   * @param srcArray The array from which bytes are to be read
+   * Transfer bytes from source {@link ByteBuffer} to destination {@link ByteBuff}, Position of both
+   * source and destination will be advanced.
    */
-  public void putMultiple(long start, int len, byte[] srcArray) {
-    putMultiple(start, len, srcArray, 0);
-  }
+  private static final BiConsumer<ByteBuffer, ByteBuff> READER = (src, dst) -> {
+    int off = dst.position(), len = src.remaining(), srcOff = src.position();
+    dst.put(off, ByteBuff.wrap(src), srcOff, len);
+    src.position(srcOff + len);
+    dst.position(off + len);
+  };
 
   /**
-   * Transfers bytes from the given source array into this buffer array
-   * @param start start offset of this buffer array
-   * @param len The maximum number of bytes to be read from the given array
-   * @param srcArray The array from which bytes are to be read
-   * @param srcOffset The offset within the given array of the first byte to be
-   *          read
+   * Transferring all remaining bytes from b to the buffers array starting at offset, or
+   * transferring bytes from the buffers array at offset to b until b is filled. Notice that
+   * position of ByteBuff b will be advanced.
+   * @param offset where we start in the big logical array.
+   * @param b the ByteBuff to transfer from or to
+   * @param transfer the transfer interface.
+   * @return the length of bytes we transferred.
    */
-  public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
-    multiple(start, len, srcArray, srcOffset, PUT_MULTIPLE_VISITOR);
-  }
-
-  private final static Visitor PUT_MULTIPLE_VISITOR = new Visitor() {
-    @Override
-    public void visit(ByteBuffer bb, int pos, byte[] array, int arrayIdx, int len) {
-      ByteBufferUtils.copyFromArrayToBuffer(bb, pos, array, arrayIdx, len);
+  private int internalTransfer(long offset, ByteBuff b, BiConsumer<ByteBuffer, ByteBuff> transfer) {
+    int expectedTransferLen = b.remaining();
+    if (expectedTransferLen == 0) {
+      return 0;
     }
-  };
-
-  private interface Visitor {
-    /**
-     * Visit the given byte buffer, if it is a read action, we will transfer the
-     * bytes from the buffer to the destination array, else if it is a write
-     * action, we will transfer the bytes from the source array to the buffer
-     * @param bb byte buffer
-     * @param pos Start position in ByteBuffer
-     * @param array a source or destination byte array
-     * @param arrayOffset offset of the byte array
-     * @param len read/write length
-     */
-    void visit(ByteBuffer bb, int pos, byte[] array, int arrayOffset, int len);
+    BufferIterator it = new BufferIterator(offset, expectedTransferLen);
+    while (it.hasNext()) {
+      ByteBuffer a = it.next();
+      transfer.accept(a, b);
+      assert !a.hasRemaining();
+    }
+    assert expectedTransferLen == it.getSum() : "Expected transfer length (=" + expectedTransferLen
+        + ") don't match the actual transfer length(=" + it.getSum() + ")";
+    return expectedTransferLen;
   }
 
   /**
-   * Access(read or write) this buffer array with a position and length as the
-   * given array. Here we will only lock one buffer even if it may be need visit
-   * several buffers. The consistency is guaranteed by the caller.
-   * @param start start offset of this buffer array
-   * @param len The maximum number of bytes to be accessed
-   * @param array The array from/to which bytes are to be read/written
-   * @param arrayOffset The offset within the given array of the first byte to
-   *          be read or written
-   * @param visitor implement of how to visit the byte buffer
+   * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the length
+   * specified. For eg, if there are 4 buffers forming an array each with length 10 and if we call
+   * asSubBuffer(5, 10) then we will create an MBB consisting of two BBs and the first one be a BB
+   * from 'position' 5 to a 'length' 5 and the 2nd BB will be from 'position' 0 to 'length' 5.
+   * @param offset the position in the whole array which is composited by multiple byte buffers.
+   * @param len the length of bytes
+   * @return a ByteBuff formed from the underlying ByteBuffers
    */
-  void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
-    assert len >= 0;
-    long end = start + len;
-    int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
-    int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
-    assert array.length >= len + arrayOffset;
-    assert startBuffer >= 0 && startBuffer < bufferCount;
-    assert (endBuffer >= 0 && endBuffer < bufferCount)
-        || (endBuffer == bufferCount && endOffset == 0);
-    if (startBuffer >= buffers.length || startBuffer < 0) {
-      String msg = "Failed multiple, start=" + start + ",startBuffer="
-          + startBuffer + ",bufferSize=" + bufferSize;
-      LOG.error(msg);
-      throw new RuntimeException(msg);
+  public ByteBuff asSubByteBuff(long offset, final int len) {
+    BufferIterator it = new BufferIterator(offset, len);
+    ByteBuffer[] mbb = new ByteBuffer[it.getBufferCount()];
+    for (int i = 0; i < mbb.length; i++) {
+      assert it.hasNext();
+      mbb[i] = it.next();
     }
-    int srcIndex = 0, cnt = -1;
-    for (int i = startBuffer; i <= endBuffer; ++i) {
-      ByteBuffer bb = buffers[i].duplicate();
-      int pos = 0;
-      if (i == startBuffer) {
-        cnt = bufferSize - startOffset;
-        if (cnt > len) cnt = len;
-        pos = startOffset;
-      } else if (i == endBuffer) {
-        cnt = endOffset;
-      } else {
-        cnt = bufferSize;
-      }
-      visitor.visit(bb, pos, array, srcIndex + arrayOffset, cnt);
-      srcIndex += cnt;
-    }
-    assert srcIndex == len;
+    assert it.getSum() == len;
+    return ByteBuff.wrap(mbb);
   }
 
   /**
-   * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the
-   * length specified. For eg, if there are 4 buffers forming an array each with length 10 and
-   * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs
-   * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from
-   * 'position' 0 to 'length' 5.
-   * @param offset
-   * @param len
-   * @return a ByteBuff formed from the underlying ByteBuffers
+   * Iterator to fetch ByteBuffers from offset with given length in this big logical array.
    */
-  public ByteBuff asSubByteBuff(long offset, int len) {
-    assert len >= 0;
-    long end = offset + len;
-    int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize);
-    int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize);
-    // Last buffer in the array is a dummy one with 0 capacity. Avoid sending back that
-    if (endBuffer == this.bufferCount) {
-      endBuffer--;
-      endBufferOffset = bufferSize;
+  private class BufferIterator implements Iterator<ByteBuffer> {
+    private final int len;
+    private int startBuffer, startOffset, endBuffer, endOffset;
+    private int curIndex, sum = 0;
+
+    private int index(long pos) {
+      return (int) (pos / bufferSize);
+    }
+
+    private int offset(long pos) {
+      return (int) (pos % bufferSize);
+    }
+
+    public BufferIterator(long offset, int len) {
+      assert len >= 0 && offset >= 0;
+      this.len = len;
+
+      this.startBuffer = index(offset);
+      this.startOffset = offset(offset);
+
+      this.endBuffer = index(offset + len);
+      this.endOffset = offset(offset + len);
+      if (startBuffer < endBuffer && endOffset == 0) {
+        endBuffer--;
+        endOffset = bufferSize;
+      }
+      assert startBuffer >= 0 && startBuffer < bufferCount;
+      assert endBuffer >= 0 && endBuffer < bufferCount;
+
+      // initialize the index to the first buffer index.
+      this.curIndex = startBuffer;
     }
-    assert startBuffer >= 0 && startBuffer < bufferCount;
-    assert (endBuffer >= 0 && endBuffer < bufferCount)
-        || (endBuffer == bufferCount && endBufferOffset == 0);
-    if (startBuffer >= buffers.length || startBuffer < 0) {
-      String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer
-          + ",bufferSize=" + bufferSize;
-      LOG.error(msg);
-      throw new RuntimeException(msg);
+
+    @Override
+    public boolean hasNext() {
+      return this.curIndex <= endBuffer;
     }
-    int srcIndex = 0, cnt = -1;
-    ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1];
-    for (int i = startBuffer, j = 0; i <= endBuffer; ++i, j++) {
-      ByteBuffer bb = buffers[i].duplicate();
-      if (i == startBuffer) {
-        cnt = bufferSize - startBufferOffset;
-        if (cnt > len) cnt = len;
-        bb.limit(startBufferOffset + cnt).position(startBufferOffset);
-      } else if (i == endBuffer) {
-        cnt = endBufferOffset;
-        bb.position(0).limit(cnt);
+
+    /**
+     * The returned ByteBuffer is an sliced one, it won't affect the position or limit of the
+     * original one.
+     */
+    @Override
+    public ByteBuffer next() {
+      ByteBuffer bb = buffers[curIndex].duplicate();
+      if (curIndex == startBuffer) {
+        bb.position(startOffset).limit(Math.min(bufferSize, startOffset + len));
+      } else if (curIndex == endBuffer) {
+        bb.position(0).limit(endOffset);
       } else {
-        cnt = bufferSize;
-        bb.position(0).limit(cnt);
+        bb.position(0).limit(bufferSize);
       }
-      mbb[j] = bb.slice();
-      srcIndex += cnt;
+      curIndex++;
+      sum += bb.remaining();
+      // Make sure that its pos is zero, it's important because MBB will count from zero for all nio
+      // ByteBuffers.
+      return bb.slice();
+    }
+
+    int getSum() {
+      return sum;
+    }
+
+    int getBufferCount() {
+      return this.endBuffer - this.startBuffer + 1;
     }
-    assert srcIndex == len;
-    return ByteBuffAllocator.wrap(mbb);
   }
 }
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
index 3fc1c23..0534924 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
@@ -20,34 +20,37 @@ package org.apache.hadoop.hbase.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Random;
+
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({MiscTests.class, SmallTests.class})
+@Category({ MiscTests.class, SmallTests.class })
 public class TestByteBufferArray {
 
+  private static final Random RANDOM = new Random(System.currentTimeMillis());
+
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestByteBufferArray.class);
 
+  private static final ByteBufferAllocator ALLOC = (size) -> ByteBuffer.allocateDirect((int) size);
+
   @Test
   public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
     int capacity = 4 * 1024 * 1024;
-    ByteBufferAllocator allocator = new ByteBufferAllocator() {
-      @Override
-      public ByteBuffer allocate(long size) throws IOException {
-        return ByteBuffer.allocateDirect((int) size);
-      }
-    };
-    ByteBufferArray array = new ByteBufferArray(capacity, allocator);
+    ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
     ByteBuff subBuf = array.asSubByteBuff(0, capacity);
     subBuf.position(capacity - 1);// Position to the last byte
     assertTrue(subBuf.hasRemaining());
@@ -59,54 +62,148 @@ public class TestByteBufferArray {
   @Test
   public void testByteBufferCreation() throws Exception {
     int capacity = 470 * 1021 * 1023;
-    ByteBufferAllocator allocator = new ByteBufferAllocator() {
-      @Override
-      public ByteBuffer allocate(long size) throws IOException {
-        return ByteBuffer.allocateDirect((int) size);
-      }
-    };
-    ByteBufferArray array = new ByteBufferArray(capacity, allocator);
-    assertEquals(119, array.buffers.length);
+    ByteBufferArray array = new ByteBufferArray(capacity, ALLOC);
+    assertEquals(118, array.buffers.length);
     for (int i = 0; i < array.buffers.length; i++) {
-      if (i == array.buffers.length - 1) {
-        assertEquals(0, array.buffers[i].capacity());
-      } else {
-        assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
-      }
+      assertEquals(ByteBufferArray.DEFAULT_BUFFER_SIZE, array.buffers[i].capacity());
     }
   }
 
   @Test
   public void testByteBufferCreation1() throws Exception {
-    ByteBufferAllocator allocator = new ByteBufferAllocator() {
-      @Override
-      public ByteBuffer allocate(long size) throws IOException {
-        return ByteBuffer.allocateDirect((int) size);
-      }
-    };
-    ByteBufferArray array = new DummyByteBufferArray(7 * 1024 * 1024, allocator);
-    // overwrite
-    array.bufferCount = 25;
-    array.buffers = new ByteBuffer[array.bufferCount + 1];
-    array.createBuffers(allocator);
+    long cap = 7 * 1024L * 1024L;
+    int bufferSize = ByteBufferArray.getBufferSize(cap), bufferCount = 25;
+    ByteBufferArray array = new ByteBufferArray(bufferSize, bufferCount, 16, cap, ALLOC);
     for (int i = 0; i < array.buffers.length; i++) {
-      if (i == array.buffers.length - 1) {
-        assertEquals(0, array.buffers[i].capacity());
-      } else {
-        assertEquals(458752, array.buffers[i].capacity());
-      }
+      assertEquals(458752, array.buffers[i].capacity());
+    }
+  }
+
+  private static void fill(ByteBuff buf, byte val) {
+    for (int i = buf.position(); i < buf.limit(); i++) {
+      buf.put(i, val);
+    }
+  }
+
+  private ByteBuff createByteBuff(int len) {
+    assert len >= 0;
+    int pos = len == 0 ? 0 : RANDOM.nextInt(len);
+    ByteBuff b = ByteBuff.wrap(ByteBuffer.allocate(2 * len));
+    b.position(pos).limit(pos + len);
+    return b;
+  }
+
+  private interface Call {
+    void run() throws IOException;
+  }
+
+  private void expectedAssert(Call r) throws IOException {
+    try {
+      r.run();
+      fail();
+    } catch (AssertionError e) {
+      // Ignore
+    }
+  }
+
+
+  @Test
+  public void testArrayIO() throws IOException {
+    int cap = 9 * 1024 * 1024, bufferSize = ByteBufferArray.getBufferSize(cap);
+    ByteBufferArray array = new ByteBufferArray(cap, ALLOC);
+    testReadAndWrite(array, 0, 512, (byte) 2);
+    testReadAndWrite(array, cap - 512, 512, (byte) 3);
+    testReadAndWrite(array, 4 * 1024 * 1024, 5 * 1024 * 1024, (byte) 4);
+    testReadAndWrite(array, 256, 256, (byte) 5);
+    testReadAndWrite(array, 257, 513, (byte) 6);
+    testReadAndWrite(array, 0, cap, (byte) 7);
+    testReadAndWrite(array, cap, 0, (byte) 8);
+    testReadAndWrite(array, cap - 1, 1, (byte) 9);
+    testReadAndWrite(array, cap - 2, 2, (byte) 10);
+
+    expectedAssert(() -> testReadAndWrite(array, cap - 2, 3, (byte) 11));
+    expectedAssert(() -> testReadAndWrite(array, cap + 1, 0, (byte) 12));
+    expectedAssert(() -> testReadAndWrite(array, 0, cap + 1, (byte) 12));
+    expectedAssert(() -> testReadAndWrite(array, -1, 0, (byte) 13));
+    expectedAssert(() -> testReadAndWrite(array, 0, -23, (byte) 14));
+    expectedAssert(() -> testReadAndWrite(array, 0, 0, (byte) 15));
+    expectedAssert(() -> testReadAndWrite(array, 4096, cap - 4096 + 1, (byte) 16));
+
+    testAsSubByteBuff(array, 0, cap, true);
+    testAsSubByteBuff(array, 0, 0, false);
+    testAsSubByteBuff(array, 0, 1, false);
+    testAsSubByteBuff(array, 0, bufferSize - 1, false);
+    testAsSubByteBuff(array, 0, bufferSize, false);
+    testAsSubByteBuff(array, 0, bufferSize + 1, true);
+    testAsSubByteBuff(array, 0, 2 * bufferSize, true);
+    testAsSubByteBuff(array, 0, 5 * bufferSize, true);
+    testAsSubByteBuff(array, cap - bufferSize - 1, bufferSize, true);
+    testAsSubByteBuff(array, cap - bufferSize, bufferSize, false);
+    testAsSubByteBuff(array, cap - bufferSize, 0, false);
+    testAsSubByteBuff(array, cap - bufferSize, 1, false);
+    testAsSubByteBuff(array, cap - bufferSize, bufferSize - 1, false);
+    testAsSubByteBuff(array, cap - 2 * bufferSize, 2 * bufferSize, true);
+    testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize + 1, true);
+    testAsSubByteBuff(array, cap - 2 * bufferSize, bufferSize - 1, false);
+    testAsSubByteBuff(array, cap - 2 * bufferSize, 0, false);
+
+    expectedAssert(() -> testAsSubByteBuff(array, 0, cap + 1, false));
+    expectedAssert(() -> testAsSubByteBuff(array, 0, -1, false));
+    expectedAssert(() -> testAsSubByteBuff(array, -1, -1, false));
+    expectedAssert(() -> testAsSubByteBuff(array, cap - bufferSize, bufferSize + 1, false));
+    expectedAssert(() -> testAsSubByteBuff(array, 2 * bufferSize, cap - 2 * bufferSize + 1, false));
+  }
+
+  private void testReadAndWrite(ByteBufferArray array, int off, int dataSize, byte val) {
+    ByteBuff src = createByteBuff(dataSize);
+    int pos = src.position(), lim = src.limit();
+    fill(src, val);
+    assertEquals(src.remaining(), dataSize);
+    try {
+      assertEquals(dataSize, array.write(off, src));
+      assertEquals(0, src.remaining());
+    } finally {
+      src.position(pos).limit(lim);
+    }
+
+    ByteBuff dst = createByteBuff(dataSize);
+    pos = dst.position();
+    lim = dst.limit();
+    try {
+      assertEquals(dataSize, array.read(off, dst));
+      assertEquals(0, dst.remaining());
+    } finally {
+      dst.position(pos).limit(lim);
     }
+    assertByteBuffEquals(src, dst);
   }
 
-  private static class DummyByteBufferArray extends ByteBufferArray {
+  private void testAsSubByteBuff(ByteBufferArray array, int off, int len, boolean isMulti) {
+    ByteBuff ret = array.asSubByteBuff(off, len);
+    if (isMulti) {
+      assertTrue(ret instanceof MultiByteBuff);
+    } else {
+      assertTrue(ret instanceof SingleByteBuff);
+    }
+    assertTrue(!ret.hasArray());
+    assertEquals(len, ret.remaining());
 
-    public DummyByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException {
-      super(capacity, allocator);
+    ByteBuff tmp = createByteBuff(len);
+    int pos = tmp.position(), lim = tmp.limit();
+    try {
+      assertEquals(len, array.read(off, tmp));
+      assertEquals(0, tmp.remaining());
+    } finally {
+      tmp.position(pos).limit(lim);
     }
 
-    @Override
-    int getThreadCount() {
-      return 16;
+    assertByteBuffEquals(ret, tmp);
+  }
+
+  private void assertByteBuffEquals(ByteBuff a, ByteBuff b) {
+    assertEquals(a.remaining(), b.remaining());
+    for (int i = a.position(), j = b.position(); i < a.limit(); i++, j++) {
+      assertEquals(a.get(i), b.get(j));
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 3b832fe..fa8b184 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -30,39 +30,37 @@ import org.apache.hadoop.hbase.util.ByteBufferAllocator;
 import org.apache.hadoop.hbase.util.ByteBufferArray;
 
 /**
- * IO engine that stores data in memory using an array of ByteBuffers
- * {@link ByteBufferArray}.
- *
- *<h2>How it Works</h2>
- * First, see {@link ByteBufferArray} and how it gives a view across multiple ByteBuffers managed
- * by it internally. This class does the physical BB create and the write and read to the
- * underlying BBs. So we will create N BBs based on the total BC capacity specified on create
- * of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560 such
- * BBs inside our ByteBufferArray.
- * 
- * <p>Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
- * default from 5 KB to 513 KB. Within each bucket of a particular size, there are
- * usually more than one bucket 'block'. The way it is calculate in bucketcache is that the total
- * bucketcache size is divided by 4 (hard-coded currently) * max size option. So using defaults,
- * buckets will be is 4 * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset
- * zero will serve out bucket 'blocks' of 5kb, the next bucket will do the next size up and so on
- * up to the maximum (default) of 513kb).
- * 
- * <p>When we write blocks to the bucketcache, we will see which bucket size group it best fits.
- * So a 4 KB block size goes to the 5 KB size group. Each of the block writes, writes within its
- * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the 
- * 5 KB bucket 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span
- * buckets.
- * 
- * <p>But you can see the physical memory under the bucket 'blocks' can be split across the
- * underlying backing BBs from ByteBufferArray. All is split into 4 MB sized BBs.
- * 
- * <p>Each Bucket knows its offset in the entire space of BC and when block is written the offset
+ * IO engine that stores data in memory using an array of ByteBuffers {@link ByteBufferArray}.
+ * <p>
+ * <h2>How it Works</h2> First, see {@link ByteBufferArray} and how it gives a view across multiple
+ * ByteBuffers managed by it internally. This class does the physical BB create and the write and
+ * read to the underlying BBs. So we will create N BBs based on the total BC capacity specified on
+ * create of the ByteBufferArray. So say we have 10 GB of off heap BucketCache, we will create 2560
+ * such BBs inside our ByteBufferArray. <br>
+ * <p>
+ * Now the way BucketCache works is that the entire 10 GB is split into diff sized buckets: by
+ * default from 5 KB to 513 KB. Within each bucket of a particular size, there are usually more than
+ * one bucket 'block'. The way it is calculate in bucketcache is that the total bucketcache size is
+ * divided by 4 (hard-coded currently) * max size option. So using defaults, buckets will be is 4 *
+ * 513kb (the biggest default value) = 2052kb. A bucket of 2052kb at offset zero will serve out
+ * bucket 'blocks' of 5kb, the next bucket will do the next size up and so on up to the maximum
+ * (default) of 513kb). <br>
+ * <p>
+ * When we write blocks to the bucketcache, we will see which bucket size group it best fits. So a 4
+ * KB block size goes to the 5 KB size group. Each of the block writes, writes within its
+ * appropriate bucket. Though the bucket is '4kb' in size, it will occupy one of the 5 KB bucket
+ * 'blocks' (even if actual size of the bucket is less). Bucket 'blocks' will not span buckets. <br>
+ * <p>
+ * But you can see the physical memory under the bucket 'blocks' can be split across the underlying
+ * backing BBs from ByteBufferArray. All is split into 4 MB sized BBs. <br>
+ * <p>
+ * Each Bucket knows its offset in the entire space of BC and when block is written the offset
  * arrives at ByteBufferArray and it figures which BB to write to. It may so happen that the entire
  * block to be written does not fit a particular backing ByteBufferArray so the remainder goes to
- * another BB. See {@link ByteBufferArray#putMultiple(long, int, byte[])}.
-
-So said all these, when we read a block it may be possible that the bytes of that blocks is physically placed in 2 adjucent BBs.  In such case also, we avoid any copy need by having the MBB...
+ * another BB. See {@link ByteBufferArray#write(long, ByteBuff)}. <br>
+ * So said all these, when we read a block it may be possible that the bytes of that blocks is
+ * physically placed in 2 adjucent BBs. In such case also, we avoid any copy need by having the
+ * MBB...
  */
 @InterfaceAudience.Private
 public class ByteBufferIOEngine implements IOEngine {
@@ -74,15 +72,9 @@ public class ByteBufferIOEngine implements IOEngine {
    * @param capacity
    * @throws IOException ideally here no exception to be thrown from the allocator
    */
-  public ByteBufferIOEngine(long capacity)
-      throws IOException {
+  public ByteBufferIOEngine(long capacity) throws IOException {
     this.capacity = capacity;
-    ByteBufferAllocator allocator = new ByteBufferAllocator() {
-      @Override
-      public ByteBuffer allocate(long size) throws IOException {
-        return ByteBuffer.allocateDirect((int) size);
-      }
-    };
+    ByteBufferAllocator allocator = (size) -> ByteBuffer.allocateDirect((int) size);
     bufferArray = new ByteBufferArray(capacity, allocator);
   }
 
@@ -121,27 +113,29 @@ public class ByteBufferIOEngine implements IOEngine {
   }
 
   /**
-   * Transfers data from the given byte buffer to the buffer array
-   * @param srcBuffer the given byte buffer from which bytes are to be read
-   * @param offset The offset in the ByteBufferArray of the first byte to be
-   *          written
+   * Transfers data from the given {@link ByteBuffer} to the buffer array. Position of source will
+   * be advanced by the {@link ByteBuffer#remaining()}.
+   * @param src the given byte buffer from which bytes are to be read.
+   * @param offset The offset in the ByteBufferArray of the first byte to be written
    * @throws IOException throws IOException if writing to the array throws exception
    */
   @Override
-  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
-    assert srcBuffer.hasArray();
-    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
-        srcBuffer.arrayOffset());
+  public void write(ByteBuffer src, long offset) throws IOException {
+    bufferArray.write(offset, ByteBuff.wrap(src));
   }
 
+  /**
+   * Transfers data from the given {@link ByteBuff} to the buffer array. Position of source will be
+   * advanced by the {@link ByteBuffer#remaining()}.
+   * @param src the given byte buffer from which bytes are to be read.
+   * @param offset The offset in the ByteBufferArray of the first byte to be written
+   * @throws IOException throws IOException if writing to the array throws exception
+   */
   @Override
-  public void write(ByteBuff srcBuffer, long offset) throws IOException {
-    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
-    // This will work for now. But from the DFS itself if we get DBB then this may not hold true.
-    assert srcBuffer.hasArray();
-    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
-        srcBuffer.arrayOffset());
+  public void write(ByteBuff src, long offset) throws IOException {
+    bufferArray.write(offset, src);
   }
+
   /**
    * No operation for the sync in the memory IO engine
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
index 8b024f0..b8e29c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ExclusiveMemoryMmapIOEngine.java
@@ -16,16 +16,15 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.HEAP;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * IO engine that stores data to a file on the local block device using memory mapping
@@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
-  static final Logger LOG = LoggerFactory.getLogger(ExclusiveMemoryMmapIOEngine.class);
 
   public ExclusiveMemoryMmapIOEngine(String filePath, long capacity) throws IOException {
     super(filePath, capacity);
@@ -42,9 +40,8 @@ public class ExclusiveMemoryMmapIOEngine extends FileMmapIOEngine {
   @Override
   public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
       throws IOException {
-    byte[] dst = new byte[length];
-    bufferArray.getMultiple(offset, length, dst);
-    return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
-      MemoryType.EXCLUSIVE);
+    ByteBuff dst = HEAP.allocate(length);
+    bufferArray.read(offset, dst);
+    return deserializer.deserialize(dst.position(0).limit(length), true, MemoryType.EXCLUSIVE);
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 0710d26..f6e49cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -143,6 +143,7 @@ public class FileIOEngine implements IOEngine {
             + " expected");
       }
     }
+    dstBuffer.rewind();
     return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
   }
 
@@ -210,10 +211,8 @@ public class FileIOEngine implements IOEngine {
 
   @Override
   public void write(ByteBuff srcBuffer, long offset) throws IOException {
-    // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
-    assert srcBuffer.hasArray();
-    write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
-            srcBuffer.remaining()), offset);
+    ByteBuffer dup = srcBuffer.asSubByteBuffer(srcBuffer.remaining()).duplicate();
+    write(dup, offset);
   }
 
   private void accessFile(FileAccessor accessor, ByteBuffer buffer,
@@ -229,8 +228,7 @@ public class FileIOEngine implements IOEngine {
       int accessLen = 0;
       if (endFileNum > accessFileNum) {
         // short the limit;
-        buffer.limit((int) (buffer.limit() - remainingAccessDataLen
-            + sizePerFile - accessOffset));
+        buffer.limit((int) (buffer.limit() - remainingAccessDataLen + sizePerFile - accessOffset));
       }
       try {
         accessLen = accessor.access(fileChannel, buffer, accessOffset);
@@ -307,7 +305,7 @@ public class FileIOEngine implements IOEngine {
     }
   }
 
-  private static interface FileAccessor {
+  private interface FileAccessor {
     int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
         throws IOException;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
index 9580efe..bd17fd5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapIOEngine.java
@@ -112,17 +112,12 @@ public abstract class FileMmapIOEngine implements IOEngine {
    */
   @Override
   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
-    assert srcBuffer.hasArray();
-    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
-      srcBuffer.arrayOffset());
+    bufferArray.write(offset, ByteBuff.wrap(srcBuffer));
   }
 
   @Override
   public void write(ByteBuff srcBuffer, long offset) throws IOException {
-    // This singleByteBuff can be considered to be array backed
-    assert srcBuffer.hasArray();
-    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
-      srcBuffer.arrayOffset());
+    bufferArray.write(offset, srcBuffer);
   }
 
   /**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index bb58b4e..a06d86d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -56,12 +55,10 @@ public class TestByteBufferIOEngine {
       if (blockSize == 0) {
         blockSize = 1;
       }
-      byte[] byteArray = new byte[blockSize];
-      for (int j = 0; j < byteArray.length; ++j) {
-        byteArray[j] = val;
-      }
-      ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
-      int offset = 0;
+
+      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
+      int pos = src.position(), lim = src.limit();
+      int offset;
       if (testOffsetAtStartNum > 0) {
         testOffsetAtStartNum--;
         offset = 0;
@@ -71,13 +68,16 @@ public class TestByteBufferIOEngine {
       } else {
         offset = (int) (Math.random() * (capacity - maxBlockSize));
       }
-      ioEngine.write(srcBuffer, offset);
+      ioEngine.write(src, offset);
+      src.position(pos).limit(lim);
+
       BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
       ioEngine.read(offset, blockSize, deserializer);
-      ByteBuff dstBuffer = deserializer.buf;
-      for (int j = 0; j < byteArray.length; ++j) {
-        assertTrue(byteArray[j] == dstBuffer.get(j));
-      }
+      ByteBuff dst = deserializer.buf;
+      Assert.assertEquals(src.remaining(), blockSize);
+      Assert.assertEquals(dst.remaining(), blockSize);
+      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
+        dst.position(), dst.remaining()));
     }
     assert testOffsetAtStartNum == 0;
     assert testOffsetAtEndNum == 0;
@@ -112,6 +112,16 @@ public class TestByteBufferIOEngine {
     }
   }
 
+  static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
+    ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
+    int pos = (int) (Math.random() * len);
+    b.position(pos).limit(pos + len);
+    for (int i = pos; i < pos + len; i++) {
+      b.put(i, (byte) val);
+    }
+    return ByteBuff.wrap(b);
+  }
+
   @Test
   public void testByteBufferIOEngineWithMBB() throws Exception {
     int capacity = 32 * 1024 * 1024; // 32 MB
@@ -126,12 +136,9 @@ public class TestByteBufferIOEngine {
       if (blockSize == 0) {
         blockSize = 1;
       }
-      byte[] byteArray = new byte[blockSize];
-      for (int j = 0; j < byteArray.length; ++j) {
-        byteArray[j] = val;
-      }
-      ByteBuffer srcBuffer = ByteBuffer.wrap(byteArray);
-      int offset = 0;
+      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
+      int pos = src.position(), lim = src.limit();
+      int offset;
       if (testOffsetAtStartNum > 0) {
         testOffsetAtStartNum--;
         offset = 0;
@@ -141,13 +148,16 @@ public class TestByteBufferIOEngine {
       } else {
         offset = (int) (Math.random() * (capacity - maxBlockSize));
       }
-      ioEngine.write(srcBuffer, offset);
+      ioEngine.write(src, offset);
+      src.position(pos).limit(lim);
+
       BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
       ioEngine.read(offset, blockSize, deserializer);
-      ByteBuff dstBuffer = deserializer.buf;
-      for (int j = 0; j < byteArray.length; ++j) {
-        assertTrue(srcBuffer.get(j) == dstBuffer.get(j));
-      }
+      ByteBuff dst = deserializer.buf;
+      Assert.assertEquals(src.remaining(), blockSize);
+      Assert.assertEquals(dst.remaining(), blockSize);
+      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
+        dst.position(), dst.remaining()));
     }
     assert testOffsetAtStartNum == 0;
     assert testOffsetAtEndNum == 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
index d0d8c8a..79d58f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestExclusiveMemoryMmapEngine.java
@@ -17,16 +17,14 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -34,7 +32,7 @@ import org.junit.experimental.categories.Category;
 /**
  * Basic test for {@link ExclusiveMemoryMmapIOEngine}
  */
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
 public class TestExclusiveMemoryMmapEngine {
 
   @ClassRule
@@ -50,17 +48,23 @@ public class TestExclusiveMemoryMmapEngine {
       for (int i = 0; i < 50; i++) {
         int len = (int) Math.floor(Math.random() * 100);
         long offset = (long) Math.floor(Math.random() * size % (size - len));
-        byte[] data1 = new byte[len];
-        for (int j = 0; j < data1.length; ++j) {
-          data1[j] = (byte) (Math.random() * 255);
-        }
-        fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
+        int val = (int) (Math.random() * 255);
+
+        // write
+        ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
+        int pos = src.position(), lim = src.limit();
+        fileMmapEngine.write(src, offset);
+        src.position(pos).limit(lim);
+
+        // read
         BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
         fileMmapEngine.read(offset, len, deserializer);
-        ByteBuff data2 = deserializer.getDeserializedByteBuff();
-        for (int j = 0; j < data1.length; ++j) {
-          assertTrue(data1[j] == data2.get(j));
-        }
+        ByteBuff dst = deserializer.getDeserializedByteBuff();
+
+        Assert.assertEquals(src.remaining(), len);
+        Assert.assertEquals(dst.remaining(), len);
+        Assert.assertEquals(0,
+          ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
       }
     } finally {
       File file = new File(filePath);
@@ -68,6 +72,5 @@ public class TestExclusiveMemoryMmapEngine {
         file.delete();
       }
     }
-
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index efb8145..6b0d603 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -132,15 +133,22 @@ public class TestFileIOEngine {
     fileIOEngine.closeFileChannels();
     int len = 5;
     long offset = 0L;
-    byte[] data1 = new byte[len];
-    for (int j = 0; j < data1.length; ++j) {
-      data1[j] = (byte) (Math.random() * 255);
+    int val = (int) (Math.random() * 255);
+    for (int i = 0; i < 2; i++) {
+      ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
+      int pos = src.position(), lim = src.limit();
+      fileIOEngine.write(src, offset);
+      src.position(pos).limit(lim);
+
+      BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
+      fileIOEngine.read(offset, len, deserializer);
+      ByteBuff dst = deserializer.getDeserializedByteBuff();
+
+      Assert.assertEquals(src.remaining(), len);
+      Assert.assertEquals(dst.remaining(), len);
+      Assert.assertEquals(0,
+        ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
     }
-    fileIOEngine.write(ByteBuffer.wrap(data1), offset);
-    BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
-    fileIOEngine.read(offset, len, deserializer);
-    ByteBuff data2 = deserializer.getDeserializedByteBuff();
-    assertArrayEquals(data1, data2.array());
   }
 
   @Test