You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/01/12 00:54:43 UTC

[hbase] 01/04: HBASE-20716 Unsafe access cleanup

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 21cfff857dd6f241d345bebe74b31901e4b0efa5
Author: Sahil Aggarwal <sa...@gmail.com>
AuthorDate: Tue Oct 9 22:41:36 2018 +0530

    HBASE-20716 Unsafe access cleanup
    
    Changes the bytes[] conversion done in Bytes and ByteBufferUtils.
    Instead of doing check unsafe_aligned available everytime, choose
    the best converter at startup.
    
    Conflicts:
    	hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
    	hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
    	hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    
    Amending-Author: Andrew Purtell <ap...@apache.org>
---
 .../apache/hadoop/hbase/filter/FuzzyRowFilter.java |  33 +-
 .../apache/hadoop/hbase/util/ByteBufferUtils.java  | 543 ++++++++++++++++++---
 .../java/org/apache/hadoop/hbase/util/Bytes.java   | 213 +++++---
 .../org/apache/hadoop/hbase/util/UnsafeAccess.java | 267 ++++++++++
 4 files changed, 918 insertions(+), 138 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
index d93d234..a2bcfb8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/FuzzyRowFilter.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.UnsafeAccess;
 import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -342,19 +341,13 @@ public class FuzzyRowFilter extends FilterBase {
     }
     length = Math.min(length, fuzzyKeyBytes.length);
     int numWords = length / Bytes.SIZEOF_LONG;
-    int offsetAdj = offset + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
 
     int j = numWords << 3; // numWords * SIZEOF_LONG;
 
     for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
-
-      long fuzzyBytes =
-          UnsafeAccess.theUnsafe.getLong(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) i);
-      long fuzzyMeta =
-          UnsafeAccess.theUnsafe.getLong(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) i);
-      long rowValue = UnsafeAccess.theUnsafe.getLong(row, offsetAdj + (long) i);
+      long fuzzyBytes = Bytes.toLong(fuzzyKeyBytes, i);
+      long fuzzyMeta = Bytes.toLong(fuzzyKeyMeta, i);
+      long rowValue = Bytes.toLong(row, offset + i);
       if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
         // We always return NEXT_EXISTS
         return SatisfiesCode.NEXT_EXISTS;
@@ -364,13 +357,9 @@ public class FuzzyRowFilter extends FilterBase {
     int off = j;
 
     if (length - off >= Bytes.SIZEOF_INT) {
-      int fuzzyBytes =
-          UnsafeAccess.theUnsafe.getInt(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      int fuzzyMeta =
-          UnsafeAccess.theUnsafe.getInt(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      int rowValue = UnsafeAccess.theUnsafe.getInt(row, offsetAdj + (long) off);
+      int fuzzyBytes = Bytes.toInt(fuzzyKeyBytes, off);
+      int fuzzyMeta = Bytes.toInt(fuzzyKeyMeta, off);
+      int rowValue = Bytes.toInt(row, offset + off);
       if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
         // We always return NEXT_EXISTS
         return SatisfiesCode.NEXT_EXISTS;
@@ -379,13 +368,9 @@ public class FuzzyRowFilter extends FilterBase {
     }
 
     if (length - off >= Bytes.SIZEOF_SHORT) {
-      short fuzzyBytes =
-          UnsafeAccess.theUnsafe.getShort(fuzzyKeyBytes, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      short fuzzyMeta =
-          UnsafeAccess.theUnsafe.getShort(fuzzyKeyMeta, UnsafeAccess.BYTE_ARRAY_BASE_OFFSET
-              + (long) off);
-      short rowValue = UnsafeAccess.theUnsafe.getShort(row, offsetAdj + (long) off);
+      short fuzzyBytes = Bytes.toShort(fuzzyKeyBytes, off);
+      short fuzzyMeta = Bytes.toShort(fuzzyKeyMeta, off);
+      short rowValue = Bytes.toShort(row, offset + off);
       if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
         // We always return NEXT_EXISTS
         // even if it does not (in this case getNextForFuzzyRule
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index b5b1d96..36fdec5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,6 +29,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import sun.nio.ch.DirectBuffer;
+
 /**
  * Utility functions for working with byte buffers, such as reading/writing
  * variable-length long numbers.
@@ -36,17 +41,277 @@ import org.apache.hadoop.io.WritableUtils;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public final class ByteBufferUtils {
-
   // "Compressed integer" serialization helper constants.
-  private final static int VALUE_MASK = 0x7f;
-  private final static int NEXT_BIT_SHIFT = 7;
-  private final static int NEXT_BIT_MASK = 1 << 7;
-  private static final boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
+  public final static int VALUE_MASK = 0x7f;
+  public final static int NEXT_BIT_SHIFT = 7;
+  public final static int NEXT_BIT_MASK = 1 << 7;
+  @VisibleForTesting
+  static final boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
+  public static final boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
 
   private ByteBufferUtils() {
   }
 
-  /**
+  static abstract class Comparer {
+    abstract int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2);
+    abstract int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2);
+  }
+
+  static abstract class Converter {
+    abstract short toShort(ByteBuffer buffer, int offset);
+    abstract int toInt(ByteBuffer buffer);
+    abstract int toInt(ByteBuffer buffer, int offset);
+    abstract long toLong(ByteBuffer buffer, int offset);
+    abstract void putInt(ByteBuffer buffer, int val);
+    abstract int putInt(ByteBuffer buffer, int index, int val);
+    abstract void putShort(ByteBuffer buffer, short val);
+    abstract int putShort(ByteBuffer buffer, int index, short val);
+    abstract void putLong(ByteBuffer buffer, long val);
+    abstract int putLong(ByteBuffer buffer, int index, long val);
+  }
+
+  static class ComparerHolder {
+    static final String UNSAFE_COMPARER_NAME = ComparerHolder.class.getName() + "$UnsafeComparer";
+
+    static final Comparer BEST_COMPARER = getBestComparer();
+
+    static Comparer getBestComparer() {
+      try {
+        Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
+
+        @SuppressWarnings("unchecked")
+        Comparer comparer = (Comparer) theClass.getConstructor().newInstance();
+        return comparer;
+      } catch (Throwable t) { // ensure we really catch *everything*
+        return PureJavaComparer.INSTANCE;
+      }
+    }
+
+    static final class PureJavaComparer extends Comparer {
+      static final PureJavaComparer INSTANCE = new PureJavaComparer();
+
+      private PureJavaComparer() {}
+
+      @Override
+      public int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+        int end1 = o1 + l1;
+        int end2 = o2 + l2;
+        for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+          int a = buf1[i] & 0xFF;
+          int b = buf2.get(j) & 0xFF;
+          if (a != b) {
+            return a - b;
+          }
+        }
+        return l1 - l2;
+      }
+
+      @Override
+      public int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+        int end1 = o1 + l1;
+        int end2 = o2 + l2;
+        for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
+          int a = buf1.get(i) & 0xFF;
+          int b = buf2.get(j) & 0xFF;
+          if (a != b) {
+            return a - b;
+          }
+        }
+        return l1 - l2;
+      }
+    }
+
+    static final class UnsafeComparer extends Comparer {
+
+      public UnsafeComparer() {}
+
+      static {
+        if(!UNSAFE_UNALIGNED) {
+          throw new Error();
+        }
+      }
+
+      @Override
+      public int compareTo(byte[] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+        long offset2Adj;
+        Object refObj2 = null;
+        if (buf2.isDirect()) {
+          offset2Adj = o2 + ((DirectBuffer)buf2).address();
+        } else {
+          offset2Adj = o2 + buf2.arrayOffset() + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+          refObj2 = buf2.array();
+        }
+        return compareToUnsafe(buf1, o1 + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET, l1,
+                refObj2, offset2Adj, l2);
+      }
+
+      @Override
+      public int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+        long offset1Adj, offset2Adj;
+        Object refObj1 = null, refObj2 = null;
+        if (buf1.isDirect()) {
+          offset1Adj = o1 + ((DirectBuffer) buf1).address();
+        } else {
+          offset1Adj = o1 + buf1.arrayOffset() + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+          refObj1 = buf1.array();
+        }
+        if (buf2.isDirect()) {
+          offset2Adj = o2 + ((DirectBuffer) buf2).address();
+        } else {
+          offset2Adj = o2 + buf2.arrayOffset() + UnsafeAccess.BYTE_ARRAY_BASE_OFFSET;
+          refObj2 = buf2.array();
+        }
+        return compareToUnsafe(refObj1, offset1Adj, l1, refObj2, offset2Adj, l2);
+      }
+    }
+  }
+
+
+  static class ConverterHolder {
+    static final String UNSAFE_CONVERTER_NAME =
+            ConverterHolder.class.getName() + "$UnsafeConverter";
+    static final Converter BEST_CONVERTER = getBestConverter();
+
+    static Converter getBestConverter() {
+      try {
+        Class<?> theClass = Class.forName(UNSAFE_CONVERTER_NAME);
+
+        // yes, UnsafeComparer does implement Comparer<byte[]>
+        @SuppressWarnings("unchecked")
+        Converter converter = (Converter) theClass.getConstructor().newInstance();
+        return converter;
+      } catch (Throwable t) { // ensure we really catch *everything*
+        return PureJavaConverter.INSTANCE;
+      }
+    }
+
+    static final class PureJavaConverter extends Converter {
+      static final PureJavaConverter INSTANCE = new PureJavaConverter();
+
+      private PureJavaConverter() {}
+
+      @Override
+      short toShort(ByteBuffer buffer, int offset) {
+        return buffer.getShort(offset);
+      }
+
+      @Override
+      int toInt(ByteBuffer buffer) {
+        return buffer.getInt();
+      }
+
+      @Override
+      int toInt(ByteBuffer buffer, int offset) {
+        return buffer.getInt(offset);
+      }
+
+      @Override
+      long toLong(ByteBuffer buffer, int offset) {
+        return buffer.getLong(offset);
+      }
+
+      @Override
+      void putInt(ByteBuffer buffer, int val) {
+        buffer.putInt(val);
+      }
+
+      @Override
+      int putInt(ByteBuffer buffer, int index, int val) {
+        buffer.putInt(index, val);
+        return index + Bytes.SIZEOF_INT;
+      }
+
+      @Override
+      void putShort(ByteBuffer buffer, short val) {
+        buffer.putShort(val);
+      }
+
+      @Override
+      int putShort(ByteBuffer buffer, int index, short val) {
+        buffer.putShort(index, val);
+        return index + Bytes.SIZEOF_SHORT;
+      }
+
+      @Override
+      void putLong(ByteBuffer buffer, long val) {
+        buffer.putLong(val);
+      }
+
+      @Override
+      int putLong(ByteBuffer buffer, int index, long val) {
+        buffer.putLong(index, val);
+        return index + Bytes.SIZEOF_LONG;
+      }
+    }
+
+    static final class UnsafeConverter extends Converter {
+
+      public UnsafeConverter() {}
+
+      static {
+        if(!UNSAFE_UNALIGNED) {
+          throw new Error();
+        }
+      }
+
+      @Override
+      short toShort(ByteBuffer buffer, int offset) {
+        return UnsafeAccess.toShort(buffer, offset);
+      }
+
+      @Override
+      int toInt(ByteBuffer buffer) {
+        int i = UnsafeAccess.toInt(buffer, buffer.position());
+        buffer.position(buffer.position() + Bytes.SIZEOF_INT);
+        return i;
+      }
+
+      @Override
+      int toInt(ByteBuffer buffer, int offset) {
+        return UnsafeAccess.toInt(buffer, offset);
+      }
+
+      @Override
+      long toLong(ByteBuffer buffer, int offset) {
+        return UnsafeAccess.toLong(buffer, offset);
+      }
+
+      @Override
+      void putInt(ByteBuffer buffer, int val) {
+        int newPos = UnsafeAccess.putInt(buffer, buffer.position(), val);
+        buffer.position(newPos);
+      }
+
+      @Override
+      int putInt(ByteBuffer buffer, int index, int val) {
+        return UnsafeAccess.putInt(buffer, index, val);
+      }
+
+      @Override
+      void putShort(ByteBuffer buffer, short val) {
+        int newPos = UnsafeAccess.putShort(buffer, buffer.position(), val);
+        buffer.position(newPos);
+      }
+
+      @Override
+      int putShort(ByteBuffer buffer, int index, short val) {
+        return UnsafeAccess.putShort(buffer, index, val);
+      }
+
+      @Override
+      void putLong(ByteBuffer buffer, long val) {
+        int newPos = UnsafeAccess.putLong(buffer, buffer.position(), val);
+        buffer.position(newPos);
+      }
+
+      @Override
+      int putLong(ByteBuffer buffer, int index, long val) {
+        return UnsafeAccess.putLong(buffer, index, val);
+      }
+    }
+  }
+
+    /**
    * Similar to {@link WritableUtils#writeVLong(java.io.DataOutput, long)},
    * but writes to a {@link ByteBuffer}.
    */
@@ -137,6 +402,14 @@ public final class ByteBufferUtils {
      }
    }
 
+   public static byte toByte(ByteBuffer buffer, int offset) {
+     if (UNSAFE_AVAIL) {
+       return UnsafeAccess.toByte(buffer, offset);
+     } else {
+       return buffer.get(offset);
+     }
+   }
+
   /**
    * Copy the data to the output stream and update position in buffer.
    * @param out the stream to write bytes to
@@ -161,11 +434,10 @@ public final class ByteBufferUtils {
   public static void copyBufferToStream(OutputStream out, ByteBuffer in,
       int offset, int length) throws IOException {
     if (in.hasArray()) {
-      out.write(in.array(), in.arrayOffset() + offset,
-          length);
+      out.write(in.array(), in.arrayOffset() + offset, length);
     } else {
       for (int i = 0; i < length; ++i) {
-        out.write(in.get(offset + i));
+        out.write(toByte(in, offset + i));
       }
     }
   }
@@ -190,27 +462,27 @@ public final class ByteBufferUtils {
       return 8;
     }
 
-    if (value < (1l << (4 * 8))) {
+    if (value < (1L << (4 * 8))) {
       // no more than 4 bytes
-      if (value < (1l << (2 * 8))) {
-        if (value < (1l << (1 * 8))) {
+      if (value < (1L << (2 * 8))) {
+        if (value < (1L << (1 * 8))) {
           return 1;
         }
         return 2;
       }
-      if (value < (1l << (3 * 8))) {
+      if (value < (1L << (3 * 8))) {
         return 3;
       }
       return 4;
     }
     // more than 4 bytes
-    if (value < (1l << (6 * 8))) {
-      if (value < (1l << (5 * 8))) {
+    if (value < (1L << (6 * 8))) {
+      if (value < (1L << (5 * 8))) {
         return 5;
       }
       return 6;
     }
-    if (value < (1l << (7 * 8))) {
+    if (value < (1L << (7 * 8))) {
       return 7;
     }
     return 8;
@@ -282,7 +554,7 @@ public final class ByteBufferUtils {
       throws IOException {
     long tmpLong = 0;
     for (int i = 0; i < fitInBytes; ++i) {
-      tmpLong |= (in.read() & 0xffl) << (8 * i);
+      tmpLong |= (in.read() & 0xffL) << (8 * i);
     }
     return tmpLong;
   }
@@ -295,7 +567,7 @@ public final class ByteBufferUtils {
   public static long readLong(ByteBuffer in, final int fitInBytes) {
     long tmpLength = 0;
     for (int i = 0; i < fitInBytes; ++i) {
-      tmpLength |= (in.get() & 0xffl) << (8l * i);
+      tmpLength |= (in.get() & 0xffL) << (8L * i);
     }
     return tmpLength;
   }
@@ -358,8 +630,8 @@ public final class ByteBufferUtils {
   /**
    * Copy from one buffer to another from given offset. This will be absolute positional copying and
    * won't affect the position of any of the buffers.
-   * @param out
    * @param in
+   * @param out
    * @param sourceOffset
    * @param destinationOffset
    * @param length
@@ -369,6 +641,8 @@ public final class ByteBufferUtils {
     if (in.hasArray() && out.hasArray()) {
       System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset()
           + destinationOffset, length);
+    } else if (UNSAFE_AVAIL) {
+      UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
     } else {
       for (int i = 0; i < length; ++i) {
         out.put((destinationOffset + i), in.get(sourceOffset + i));
@@ -377,27 +651,6 @@ public final class ByteBufferUtils {
   }
 
   /**
-   * Find length of common prefix of two parts in the buffer
-   * @param buffer Where parts are located.
-   * @param offsetLeft Offset of the first part.
-   * @param offsetRight Offset of the second part.
-   * @param limit Maximal length of common prefix.
-   * @return Length of prefix.
-   */
-  public static int findCommonPrefix(ByteBuffer buffer, int offsetLeft,
-      int offsetRight, int limit) {
-    int prefix = 0;
-
-    for (; prefix < limit; ++prefix) {
-      if (buffer.get(offsetLeft + prefix) != buffer.get(offsetRight + prefix)) {
-        break;
-      }
-    }
-
-    return prefix;
-  }
-
-  /**
    * Find length of common prefix in two arrays.
    * @param left Array to be compared.
    * @param leftOffset Offset in left array.
@@ -421,6 +674,28 @@ public final class ByteBufferUtils {
   }
 
   /**
+   * Find length of common prefix in two arrays.
+   * @param left ByteBuffer to be compared.
+   * @param leftOffset Offset in left ByteBuffer.
+   * @param leftLength Length of left ByteBuffer.
+   * @param right ByteBuffer to be compared.
+   * @param rightOffset Offset in right ByteBuffer.
+   * @param rightLength Length of right ByteBuffer.
+   */
+  public static int findCommonPrefix(ByteBuffer left, int leftOffset, int leftLength,
+      ByteBuffer right, int rightOffset, int rightLength) {
+    int length = Math.min(leftLength, rightLength);
+    int result = 0;
+
+    while (result < length && ByteBufferUtils.toByte(left, leftOffset + result) == ByteBufferUtils
+        .toByte(right, rightOffset + result)) {
+      result++;
+    }
+
+    return result;
+  }
+
+  /**
    * Check whether two parts in the same buffer are equal.
    * @param buffer In which buffer there are parts
    * @param offsetLeft Beginning of first part.
@@ -494,26 +769,181 @@ public final class ByteBufferUtils {
     return output;
   }
 
-  public static int compareTo(ByteBuffer buf1, int o1, int len1, ByteBuffer buf2, int o2, int len2) {
-    if (buf1.hasArray() && buf2.hasArray()) {
-      return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(),
-          buf2.arrayOffset() + o2, len2);
+  public static int compareTo(ByteBuffer buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+    return ComparerHolder.BEST_COMPARER.compareTo(buf1, o1, l1, buf2, o2, l2);
+  }
+
+  public static boolean equals(ByteBuffer buf1, int o1, int l1, byte[] buf2, int o2, int l2) {
+    if ((l1 == 0) || (l2 == 0)) {
+      // both 0 length, return true, or else false
+      return l1 == l2;
     }
-    int end1 = o1 + len1;
-    int end2 = o2 + len2;
-    for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) {
-      int a = buf1.get(i) & 0xFF;
-      int b = buf2.get(j) & 0xFF;
-      if (a != b) {
-        return a - b;
+    // Since we're often comparing adjacent sorted data,
+    // it's usual to have equal arrays except for the very last byte
+    // so check that first
+    if (toByte(buf1, o1 + l1 - 1) != buf2[o2 + l2 - 1]) return false;
+    return compareTo(buf1, o1, l1, buf2, o2, l2) == 0;
+  }
+
+  // The below two methods show up in lots of places. Versions of them in commons util and in
+  // Cassandra. In guava too? They are copied from ByteBufferUtils. They are here as static
+  // privates. Seems to make code smaller and make Hotspot happier (comes of compares and study
+  // of compiled code via  jitwatch).
+
+  public static int compareTo(byte [] buf1, int o1, int l1, ByteBuffer buf2, int o2, int l2) {
+    return ComparerHolder.BEST_COMPARER.compareTo(buf1, o1, l1, buf2, o2, l2);
+  }
+
+  public static int compareTo(ByteBuffer buf1, int o1, int l1, byte[] buf2, int o2, int l2) {
+    return compareTo(buf2, o2, l2, buf1, o1, l1)*-1;
+  }
+
+  static int compareToUnsafe(Object obj1, long o1, int l1, Object obj2, long o2, int l2) {
+    final int stride = 8;
+    final int minLength = Math.min(l1, l2);
+    int strideLimit = minLength & ~(stride - 1);
+    int i;
+
+    /*
+     * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a time is no slower than
+     * comparing 4 bytes at a time even on 32-bit. On the other hand, it is substantially faster on
+     * 64-bit.
+     */
+    for (i = 0; i < strideLimit; i += stride) {
+      long lw = UnsafeAccess.theUnsafe.getLong(obj1, o1 + (long) i);
+      long rw = UnsafeAccess.theUnsafe.getLong(obj2, o2 + (long) i);
+      if (lw != rw) {
+        if (!UnsafeAccess.LITTLE_ENDIAN) {
+          return ((lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE)) ? -1 : 1;
+        }
+
+        /*
+         * We want to compare only the first index where left[index] != right[index]. This
+         * corresponds to the least significant nonzero byte in lw ^ rw, since lw and rw are
+         * little-endian. Long.numberOfTrailingZeros(diff) tells us the least significant
+         * nonzero bit, and zeroing out the first three bits of L.nTZ gives us the shift to get
+         * that least significant nonzero byte. This comparison logic is based on UnsignedBytes
+         * from guava v21
+         */
+        int n = Long.numberOfTrailingZeros(lw ^ rw) & ~0x7;
+        return ((int) ((lw >>> n) & 0xFF)) - ((int) ((rw >>> n) & 0xFF));
+      }
+    }
+
+    // The epilogue to cover the last (minLength % stride) elements.
+    for (; i < minLength; i++) {
+      int il = (UnsafeAccess.theUnsafe.getByte(obj1, o1 + i) & 0xFF);
+      int ir = (UnsafeAccess.theUnsafe.getByte(obj2, o2 + i) & 0xFF);
+      if (il != ir) {
+        return il - ir;
       }
     }
-    return len1 - len2;
+    return l1 - l2;
+  }
+
+  /**
+   * Reads a short value at the given buffer's offset.
+   * @param buffer
+   * @param offset
+   * @return short value at offset
+   */
+  public static short toShort(ByteBuffer buffer, int offset) {
+    return ConverterHolder.BEST_CONVERTER.toShort(buffer, offset);
+  }
+
+  /**
+   * Reads an int value at the given buffer's current position. Also advances the buffer's position
+   */
+  public static int toInt(ByteBuffer buffer) {
+    return ConverterHolder.BEST_CONVERTER.toInt(buffer);
+  }
+
+  /**
+   * Reads an int value at the given buffer's offset.
+   * @param buffer
+   * @param offset
+   * @return int value at offset
+   */
+  public static int toInt(ByteBuffer buffer, int offset) {
+    return ConverterHolder.BEST_CONVERTER.toInt(buffer, offset);
+  }
+
+  /**
+   * Reads a long value at the given buffer's offset.
+   * @param buffer
+   * @param offset
+   * @return long value at offset
+   */
+  public static long toLong(ByteBuffer buffer, int offset) {
+    return ConverterHolder.BEST_CONVERTER.toLong(buffer, offset);
+  }
+
+  /**
+   * Put an int value out to the given ByteBuffer's current position in big-endian format.
+   * This also advances the position in buffer by int size.
+   * @param buffer the ByteBuffer to write to
+   * @param val int to write out
+   */
+  public static void putInt(ByteBuffer buffer, int val) {
+    ConverterHolder.BEST_CONVERTER.putInt(buffer, val);
+  }
+
+  public static int putInt(ByteBuffer buffer, int index, int val) {
+    return ConverterHolder.BEST_CONVERTER.putInt(buffer, index, val);
+  }
+
+  /**
+   * Put a short value out to the given ByteBuffer's current position in big-endian format.
+   * This also advances the position in buffer by short size.
+   * @param buffer the ByteBuffer to write to
+   * @param val short to write out
+   */
+  public static void putShort(ByteBuffer buffer, short val) {
+    ConverterHolder.BEST_CONVERTER.putShort(buffer, val);
+  }
+
+  public static int putShort(ByteBuffer buffer, int index, short val) {
+    return ConverterHolder.BEST_CONVERTER.putShort(buffer, index, val);
+  }
+
+  /**
+   * Put a long value out to the given ByteBuffer's current position in big-endian format.
+   * This also advances the position in buffer by long size.
+   * @param buffer the ByteBuffer to write to
+   * @param val long to write out
+   */
+  public static void putLong(ByteBuffer buffer, long val) {
+    ConverterHolder.BEST_CONVERTER.putLong(buffer, val);
+  }
+
+  public static int putLong(ByteBuffer buffer, int index, long val) {
+    return ConverterHolder.BEST_CONVERTER.putLong(buffer, index, val);
+  }
+
+  /**
+   * Copies bytes from given array's offset to length part into the given buffer. Puts the bytes
+   * to buffer's given position. This doesn't affact the position of buffer.
+   * @param out
+   * @param in
+   * @param inOffset
+   * @param length
+   */
+  public static void copyFromArrayToBuffer(ByteBuffer out, int outOffset, byte[] in, int inOffset,
+      int length) {
+    if (out.hasArray()) {
+      System.arraycopy(in, inOffset, out.array(), out.arrayOffset() + outOffset, length);
+    } else if (UNSAFE_AVAIL) {
+      UnsafeAccess.copy(in, inOffset, out, outOffset, length);
+    } else {
+      ByteBuffer outDup = out.duplicate();
+      outDup.position(outOffset);
+      outDup.put(in, inOffset, length);
+    }
   }
 
   /**
    * Copies specified number of bytes from given offset of 'in' ByteBuffer to
-   * the array.
+   * the array. This doesn't affact the position of buffer.
    * @param out
    * @param in
    * @param sourceOffset
@@ -527,10 +957,9 @@ public final class ByteBufferUtils {
     } else if (UNSAFE_AVAIL) {
       UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
     } else {
-      int oldPos = in.position();
-      in.position(sourceOffset);
-      in.get(out, destinationOffset, length);
-      in.position(oldPos);
+      ByteBuffer inDup = in.duplicate();
+      inDup.position(sourceOffset);
+      inDup.get(out, destinationOffset, length);
     }
   }
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index 93dcafe..8e55b63 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -752,16 +752,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_LONG || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
     }
-    if (UNSAFE_UNALIGNED) {
-      return toLongUnsafe(bytes, offset);
-    } else {
-      long l = 0;
-      for(int i = offset; i < offset + length; i++) {
-        l <<= 8;
-        l ^= bytes[i] & 0xFF;
-      }
-      return l;
-    }
+    return ConverterHolder.BEST_CONVERTER.toLong(bytes, offset, length);
   }
 
   private static IllegalArgumentException
@@ -793,16 +784,7 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put a long at"
           + " offset " + offset + " in a " + bytes.length + " byte array");
     }
-    if (UNSAFE_UNALIGNED) {
-      return putLongUnsafe(bytes, offset, val);
-    } else {
-      for(int i = offset + 7; i > offset; i--) {
-        bytes[i] = (byte) val;
-        val >>>= 8;
-      }
-      bytes[offset] = (byte) val;
-      return offset + SIZEOF_LONG;
-    }
+    return ConverterHolder.BEST_CONVERTER.putLong(bytes, offset, val);
   }
 
   /**
@@ -948,16 +930,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_INT || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);
     }
-    if (UNSAFE_UNALIGNED) {
-      return toIntUnsafe(bytes, offset);
-    } else {
-      int n = 0;
-      for(int i = offset; i < (offset + length); i++) {
-        n <<= 8;
-        n ^= bytes[i] & 0xFF;
-      }
-      return n;
-    }
+    return ConverterHolder.BEST_CONVERTER.toInt(bytes, offset, length);
   }
 
   /**
@@ -1044,16 +1017,7 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put an int at"
           + " offset " + offset + " in a " + bytes.length + " byte array");
     }
-    if (UNSAFE_UNALIGNED) {
-      return putIntUnsafe(bytes, offset, val);
-    } else {
-      for(int i= offset + 3; i > offset; i--) {
-        bytes[i] = (byte) val;
-        val >>>= 8;
-      }
-      bytes[offset] = (byte) val;
-      return offset + SIZEOF_INT;
-    }
+    return ConverterHolder.BEST_CONVERTER.putInt(bytes, offset, val);
   }
 
   /**
@@ -1118,15 +1082,7 @@ public class Bytes implements Comparable<Bytes> {
     if (length != SIZEOF_SHORT || offset + length > bytes.length) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT);
     }
-    if (UNSAFE_UNALIGNED) {
-      return toShortUnsafe(bytes, offset);
-    } else {
-      short n = 0;
-      n = (short) ((n ^ bytes[offset]) & 0xFF);
-      n = (short) (n << 8);
-      n = (short) ((n ^ bytes[offset+1]) & 0xFF);
-      return n;
-   }
+    return ConverterHolder.BEST_CONVERTER.toShort(bytes, offset, length);
   }
 
   /**
@@ -1156,14 +1112,7 @@ public class Bytes implements Comparable<Bytes> {
       throw new IllegalArgumentException("Not enough room to put a short at"
           + " offset " + offset + " in a " + bytes.length + " byte array");
     }
-    if (UNSAFE_UNALIGNED) {
-      return putShortUnsafe(bytes, offset, val);
-    } else {
-      bytes[offset+1] = (byte) val;
-      val >>= 8;
-      bytes[offset] = (byte) val;
-      return offset + SIZEOF_SHORT;
-    }
+    return ConverterHolder.BEST_CONVERTER.putShort(bytes, offset, val);
   }
 
   /**
@@ -1396,11 +1345,161 @@ public class Bytes implements Comparable<Bytes> {
     );
   }
 
+  static abstract class Converter {
+    abstract long toLong(byte[] bytes, int offset, int length);
+    abstract int putLong(byte[] bytes, int offset, long val);
+
+    abstract int toInt(byte[] bytes, int offset, final int length);
+    abstract int putInt(byte[] bytes, int offset, int val);
+
+    abstract short toShort(byte[] bytes, int offset, final int length);
+    abstract int putShort(byte[] bytes, int offset, short val);
+
+  }
+
   @VisibleForTesting
   static Comparer<byte[]> lexicographicalComparerJavaImpl() {
     return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
   }
 
+  static class ConverterHolder {
+    static final String UNSAFE_CONVERTER_NAME =
+            ConverterHolder.class.getName() + "$UnsafeConverter";
+
+    static final Converter BEST_CONVERTER = getBestConverter();
+    /**
+     * Returns the Unsafe-using Converter, or falls back to the pure-Java
+     * implementation if unable to do so.
+     */
+    static Converter getBestConverter() {
+      try {
+        Class<?> theClass = Class.forName(UNSAFE_CONVERTER_NAME);
+
+        // yes, UnsafeComparer does implement Comparer<byte[]>
+        @SuppressWarnings("unchecked")
+        Converter converter = (Converter) theClass.getConstructor().newInstance();
+        return converter;
+      } catch (Throwable t) { // ensure we really catch *everything*
+        return PureJavaConverter.INSTANCE;
+      }
+    }
+
+    protected static final class PureJavaConverter extends Converter {
+      static final PureJavaConverter INSTANCE = new PureJavaConverter();
+
+      private PureJavaConverter() {}
+
+      @Override
+      long toLong(byte[] bytes, int offset, int length) {
+        long l = 0;
+        for(int i = offset; i < offset + length; i++) {
+          l <<= 8;
+          l ^= bytes[i] & 0xFF;
+        }
+        return l;
+      }
+
+      @Override
+      int putLong(byte[] bytes, int offset, long val) {
+        for(int i = offset + 7; i > offset; i--) {
+          bytes[i] = (byte) val;
+          val >>>= 8;
+        }
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_LONG;
+      }
+
+      @Override
+      int toInt(byte[] bytes, int offset, int length) {
+        int n = 0;
+        for(int i = offset; i < (offset + length); i++) {
+          n <<= 8;
+          n ^= bytes[i] & 0xFF;
+        }
+        return n;
+      }
+
+      @Override
+      int putInt(byte[] bytes, int offset, int val) {
+        for(int i= offset + 3; i > offset; i--) {
+          bytes[i] = (byte) val;
+          val >>>= 8;
+        }
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_INT;
+      }
+
+      @Override
+      short toShort(byte[] bytes, int offset, int length) {
+        short n = 0;
+        n = (short) ((n ^ bytes[offset]) & 0xFF);
+        n = (short) (n << 8);
+        n ^= (short) (bytes[offset+1] & 0xFF);
+        return n;
+      }
+
+      @Override
+      int putShort(byte[] bytes, int offset, short val) {
+        bytes[offset+1] = (byte) val;
+        val >>= 8;
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_SHORT;
+      }
+    }
+
+    protected static final class UnsafeConverter extends Converter {
+
+      static final Unsafe theUnsafe;
+
+      public UnsafeConverter() {}
+
+      static {
+        if (UNSAFE_UNALIGNED) {
+          theUnsafe = UnsafeAccess.theUnsafe;
+        } else {
+          // It doesn't matter what we throw;
+          // it's swallowed in getBestComparer().
+          throw new Error();
+        }
+
+        // sanity check - this should never fail
+        if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
+          throw new AssertionError();
+        }
+      }
+
+      @Override
+      long toLong(byte[] bytes, int offset, int length) {
+        return UnsafeAccess.toLong(bytes, offset);
+      }
+
+      @Override
+      int putLong(byte[] bytes, int offset, long val) {
+        return UnsafeAccess.putLong(bytes, offset, val);
+      }
+
+      @Override
+      int toInt(byte[] bytes, int offset, int length) {
+        return UnsafeAccess.toInt(bytes, offset);
+      }
+
+      @Override
+      int putInt(byte[] bytes, int offset, int val) {
+        return UnsafeAccess.putInt(bytes, offset, val);
+      }
+
+      @Override
+      short toShort(byte[] bytes, int offset, int length) {
+        return UnsafeAccess.toShort(bytes, offset);
+      }
+
+      @Override
+      int putShort(byte[] bytes, int offset, short val) {
+        return UnsafeAccess.putShort(bytes, offset, val);
+      }
+    }
+  }
+
   /**
    * Provides a lexicographical comparer implementation; either a Java
    * implementation or a faster implementation based on {@link Unsafe}.
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 9ebff7d..275c750 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
 
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 
@@ -43,6 +44,9 @@ public final class UnsafeAccess {
   /** The offset to the first element in a byte array. */
   public static final int BYTE_ARRAY_BASE_OFFSET;
 
+  public static final boolean LITTLE_ENDIAN =
+    ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
   // This number limits the number of bytes to copy per call to Unsafe's
   // copyMemory method. A limit is imposed to allow for safepoint polling
   // during a large copy
@@ -155,4 +159,267 @@ public final class UnsafeAccess {
     }
     unsafeCopy(srcBase, srcAddress, destBase, destAddress, length);
   }
+
+  // APIs to read primitive data from a byte[] using Unsafe way
+  /**
+   * Converts a byte array to a short value considering it was written in big-endian format.
+   * @param bytes byte array
+   * @param offset offset into array
+   * @return the short value
+   */
+  public static short toShort(byte[] bytes, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+    } else {
+      return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+    }
+  }
+
+  /**
+   * Converts a byte array to an int value considering it was written in big-endian format.
+   * @param bytes byte array
+   * @param offset offset into array
+   * @return the int value
+   */
+  public static int toInt(byte[] bytes, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+    } else {
+      return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+    }
+  }
+
+  /**
+   * Converts a byte array to a long value considering it was written in big-endian format.
+   * @param bytes byte array
+   * @param offset offset into array
+   * @return the long value
+   */
+  public static long toLong(byte[] bytes, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET));
+    } else {
+      return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
+    }
+  }
+
+  // APIs to write primitive data to a byte[] using Unsafe way
+  /**
+   * Put a short value out to the specified byte array position in big-endian format.
+   * @param bytes the byte array
+   * @param offset position in the array
+   * @param val short to write out
+   * @return incremented offset
+   */
+  public static int putShort(byte[] bytes, int offset, short val) {
+    if (LITTLE_ENDIAN) {
+      val = Short.reverseBytes(val);
+    }
+    theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+    return offset + Bytes.SIZEOF_SHORT;
+  }
+
+  /**
+   * Put an int value out to the specified byte array position in big-endian format.
+   * @param bytes the byte array
+   * @param offset position in the array
+   * @param val int to write out
+   * @return incremented offset
+   */
+  public static int putInt(byte[] bytes, int offset, int val) {
+    if (LITTLE_ENDIAN) {
+      val = Integer.reverseBytes(val);
+    }
+    theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+    return offset + Bytes.SIZEOF_INT;
+  }
+
+  /**
+   * Put a long value out to the specified byte array position in big-endian format.
+   * @param bytes the byte array
+   * @param offset position in the array
+   * @param val long to write out
+   * @return incremented offset
+   */
+  public static int putLong(byte[] bytes, int offset, long val) {
+    if (LITTLE_ENDIAN) {
+      val = Long.reverseBytes(val);
+    }
+    theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
+    return offset + Bytes.SIZEOF_LONG;
+  }
+
+  // APIs to read primitive data from a ByteBuffer using Unsafe way
+  /**
+   * Reads a short value at the given buffer's offset considering it was written in big-endian
+   * format.
+   *
+   * @param buf
+   * @param offset
+   * @return short value at offset
+   */
+  public static short toShort(ByteBuffer buf, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Short.reverseBytes(getAsShort(buf, offset));
+    }
+    return getAsShort(buf, offset);
+  }
+
+  /**
+   * Reads bytes at the given offset as a short value.
+   * @param buf
+   * @param offset
+   * @return short value at offset
+   */
+  static short getAsShort(ByteBuffer buf, int offset) {
+    if (buf.isDirect()) {
+      return theUnsafe.getShort(((DirectBuffer) buf).address() + offset);
+    }
+    return theUnsafe.getShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+  }
+
+  /**
+   * Reads an int value at the given buffer's offset considering it was written in big-endian
+   * format.
+   *
+   * @param buf
+   * @param offset
+   * @return int value at offset
+   */
+  public static int toInt(ByteBuffer buf, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Integer.reverseBytes(getAsInt(buf, offset));
+    }
+    return getAsInt(buf, offset);
+  }
+
+  /**
+   * Reads bytes at the given offset as an int value.
+   * @param buf
+   * @param offset
+   * @return int value at offset
+   */
+  static int getAsInt(ByteBuffer buf, int offset) {
+    if (buf.isDirect()) {
+      return theUnsafe.getInt(((DirectBuffer) buf).address() + offset);
+    }
+    return theUnsafe.getInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+  }
+
+  /**
+   * Reads a long value at the given buffer's offset considering it was written in big-endian
+   * format.
+   *
+   * @param buf
+   * @param offset
+   * @return long value at offset
+   */
+  public static long toLong(ByteBuffer buf, int offset) {
+    if (LITTLE_ENDIAN) {
+      return Long.reverseBytes(getAsLong(buf, offset));
+    }
+    return getAsLong(buf, offset);
+  }
+
+  /**
+   * Reads bytes at the given offset as a long value.
+   * @param buf
+   * @param offset
+   * @return long value at offset
+   */
+  static long getAsLong(ByteBuffer buf, int offset) {
+    if (buf.isDirect()) {
+      return theUnsafe.getLong(((DirectBuffer) buf).address() + offset);
+    }
+    return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+  }
+
+  /**
+   * Put an int value out to the specified ByteBuffer offset in big-endian format.
+   * @param buf the ByteBuffer to write to
+   * @param offset offset in the ByteBuffer
+   * @param val int to write out
+   * @return incremented offset
+   */
+  public static int putInt(ByteBuffer buf, int offset, int val) {
+    if (LITTLE_ENDIAN) {
+      val = Integer.reverseBytes(val);
+    }
+    if (buf.isDirect()) {
+      theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val);
+    } else {
+      theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val);
+    }
+    return offset + Bytes.SIZEOF_INT;
+  }
+
+  // APIs to add primitives to BBs
+  /**
+   * Put a short value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param val short to write out
+   * @return incremented offset
+   */
+  public static int putShort(ByteBuffer buf, int offset, short val) {
+    if (LITTLE_ENDIAN) {
+      val = Short.reverseBytes(val);
+    }
+    if (buf.isDirect()) {
+      theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val);
+    } else {
+      theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+    }
+    return offset + Bytes.SIZEOF_SHORT;
+  }
+
+  /**
+   * Put a long value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param val long to write out
+   * @return incremented offset
+   */
+  public static int putLong(ByteBuffer buf, int offset, long val) {
+    if (LITTLE_ENDIAN) {
+      val = Long.reverseBytes(val);
+    }
+    if (buf.isDirect()) {
+      theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val);
+    } else {
+      theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val);
+    }
+    return offset + Bytes.SIZEOF_LONG;
+  }
+
+  /**
+   * Put a byte value out to the specified BB position in big-endian format.
+   * @param buf the byte buffer
+   * @param offset position in the buffer
+   * @param b byte to write out
+   * @return incremented offset
+   */
+  public static int putByte(ByteBuffer buf, int offset, byte b) {
+    if (buf.isDirect()) {
+      theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b);
+    } else {
+      theUnsafe.putByte(buf.array(),
+        BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b);
+    }
+    return offset + 1;
+  }
+
+  /**
+   * Returns the byte at the given offset
+   * @param buf the buffer to read
+   * @param offset the offset at which the byte has to be read
+   * @return the byte at the given offset
+   */
+  public static byte toByte(ByteBuffer buf, int offset) {
+    if (buf.isDirect()) {
+      return theUnsafe.getByte(((DirectBuffer) buf).address() + offset);
+    } else {
+      return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset);
+    }
+  }
 }