You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2016/02/04 22:26:42 UTC

hbase git commit: HBASE-15177 Reduce garbage created under high load

Repository: hbase
Updated Branches:
  refs/heads/branch-1 908e5a662 -> 73d677882


HBASE-15177 Reduce garbage created under high load

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/73d67788
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/73d67788
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/73d67788

Branch: refs/heads/branch-1
Commit: 73d67788206c3f60773d861375f5e6934a284418
Parents: 908e5a6
Author: Enis Soztutar <en...@apache.org>
Authored: Thu Feb 4 11:07:36 2016 -0800
Committer: Enis Soztutar <en...@apache.org>
Committed: Thu Feb 4 13:26:22 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ScannerCallable.java    | 14 +--
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |  7 +-
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    | 20 ++---
 .../hbase/ipc/PayloadCarryingRpcController.java |  7 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  6 +-
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 19 +++-
 .../hadoop/hbase/client/TestClientScanner.java  |  2 +-
 .../apache/hadoop/hbase/ipc/TestIPCUtil.java    |  4 +-
 .../hadoop/hbase/io/ByteBufferInputStream.java  | 14 ++-
 .../hadoop/hbase/util/ByteBufferUtils.java      | 25 ++++++
 .../org/apache/hadoop/hbase/util/Threads.java   |  2 +-
 .../apache/hadoop/hbase/util/UnsafeAccess.java  | 95 +++++++++++++++++++-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 55 +++++++-----
 .../AnnotationReadingPriorityFunction.java      |  9 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  6 +-
 .../hbase/regionserver/RSRpcServices.java       | 15 +++-
 16 files changed, 231 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 5100314..8912e58 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -194,6 +194,13 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
     if (Thread.interrupted()) {
       throw new InterruptedIOException();
     }
+
+    if (controller == null) {
+      controller = controllerFactory.newController();
+      controller.setPriority(getTableName());
+      controller.setCallTimeout(callTimeout);
+    }
+
     if (closed) {
       if (scannerId != -1) {
         close();
@@ -212,9 +219,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
               RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
                 this.scanMetrics != null, renew);
           ScanResponse response = null;
-          controller = controllerFactory.newController();
-          controller.setPriority(getTableName());
-          controller.setCallTimeout(callTimeout);
           try {
             response = getStub().scan(controller, request);
             // Client and RS maintain a nextCallSeq number during the scan. Every next() call
@@ -374,7 +378,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
       ScanRequest request =
           RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
       try {
-        getStub().scan(null, request);
+        getStub().scan(controller, request);
       } catch (ServiceException se) {
         throw ProtobufUtil.getRemoteException(se);
       }
@@ -391,7 +395,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
         getLocation().getRegionInfo().getRegionName(),
         this.scan, 0, false);
     try {
-      ScanResponse response = getStub().scan(null, request);
+      ScanResponse response = getStub().scan(controller, request);
       long id = response.getScannerId();
       if (logScannerActivity) {
         LOG.info("Open scanner=" + id + " for scan=" + scan.toString()

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 44e8322..9fe2cf6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -421,7 +421,7 @@ public class AsyncRpcChannel {
         requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build());
       }
       // Only pass priority if there one.  Let zero be same as no priority.
-      if (call.controller.getPriority() != 0) {
+      if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) {
         requestHeaderBuilder.setPriority(call.controller.getPriority());
       }
 
@@ -669,6 +669,7 @@ public class AsyncRpcChannel {
   private void handleSaslConnectionFailure(final int currRetries, final Throwable ex,
       final UserGroupInformation user) throws IOException, InterruptedException {
     user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
       public Void run() throws IOException, InterruptedException {
         if (shouldAuthenticateOverKrb()) {
           if (currRetries < MAX_SASL_RETRIES) {
@@ -711,12 +712,12 @@ public class AsyncRpcChannel {
   public int getConnectionHashCode() {
     return ConnectionId.hashCode(ticket, serviceName, address);
   }
-  
+
   @Override
   public int hashCode() {
     return getConnectionHashCode();
   }
-     
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof AsyncRpcChannel) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 734227c..22c5cc1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -180,19 +180,18 @@ public class IPCUtil {
   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
       final byte [] cellBlock)
   throws IOException {
-    return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
+    return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
   }
 
   /**
    * @param codec
-   * @param cellBlock
-   * @param offset
-   * @param length
+   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+   * position()'ed at the start of the cell block and limit()'ed at the end.
    * @return CellScanner to work against the content of <code>cellBlock</code>
    * @throws IOException
    */
   public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
-      final byte [] cellBlock, final int offset, final int length)
+      final ByteBuffer cellBlock)
   throws IOException {
     // If compressed, decompress it first before passing it on else we will leak compression
     // resources if the stream is not closed properly after we let it out.
@@ -202,18 +201,17 @@ public class IPCUtil {
       if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
       Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
       CompressionInputStream cis =
-        compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length),
-        poolDecompressor);
+        compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
       ByteBufferOutputStream bbos = null;
       try {
         // TODO: This is ugly.  The buffer will be resized on us if we guess wrong.
         // TODO: Reuse buffers.
-        bbos = new ByteBufferOutputStream((length - offset) *
+        bbos = new ByteBufferOutputStream(cellBlock.remaining() *
           this.cellBlockDecompressionMultiplier);
         IOUtils.copy(cis, bbos);
         bbos.close();
         ByteBuffer bb = bbos.getByteBuffer();
-        is = new ByteArrayInputStream(bb.array(), 0, bb.limit());
+        is = new ByteBufferInputStream(bb);
       } finally {
         if (is != null) is.close();
         if (bbos != null) bbos.close();
@@ -221,7 +219,7 @@ public class IPCUtil {
         CodecPool.returnDecompressor(poolDecompressor);
       }
     } else {
-      is = new ByteArrayInputStream(cellBlock, offset, length);
+      is = new ByteBufferInputStream(cellBlock);
     }
     return codec.getDecoder(is);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
index 70f30f9..82634e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java
@@ -35,14 +35,14 @@ import org.apache.hadoop.hbase.TableName;
 @InterfaceAudience.Private
 public class PayloadCarryingRpcController
     extends TimeLimitedRpcController implements CellScannable {
+
+  public static final int PRIORITY_UNSET = -1;
   /**
    * Priority to set on this request.  Set it here in controller so available composing the
    * request.  This is the ordained way of setting priorities going forward.  We will be
    * undoing the old annotation-based mechanism.
    */
-  // Currently only multi call makes use of this.  Eventually this should be only way to set
-  // priority.
-  private int priority = HConstants.NORMAL_QOS;
+  private int priority = PRIORITY_UNSET;
 
   /**
    * They are optionally set on construction, cleared after we make the call, and then optionally
@@ -67,6 +67,7 @@ public class PayloadCarryingRpcController
   /**
    * @return One-shot cell scanner (you cannot back it up and restart)
    */
+  @Override
   public CellScanner cellScanner() {
     return cellScanner;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 82ff5a9..e1821e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -899,8 +899,10 @@ public class RpcClientImpl extends AbstractRpcClient {
         cellBlockBuilder.setLength(cellBlock.limit());
         builder.setCellBlockMeta(cellBlockBuilder.build());
       }
-      // Only pass priority if there one.  Let zero be same as no priority.
-      if (priority != 0) builder.setPriority(priority);
+      // Only pass priority if there is one set.
+      if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
+        builder.setPriority(priority);
+      }
       RequestHeader header = builder.build();
 
       setupIOstreams();

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c5c8b88..bdca5c1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -2479,13 +2479,13 @@ public final class ProtobufUtil {
    */
   public static String getRegionEncodedName(
       final RegionSpecifier regionSpecifier) throws DoNotRetryIOException {
-    byte[] value = regionSpecifier.getValue().toByteArray();
+    ByteString value = regionSpecifier.getValue();
     RegionSpecifierType type = regionSpecifier.getType();
     switch (type) {
       case REGION_NAME:
-        return HRegionInfo.encodeRegionName(value);
+        return HRegionInfo.encodeRegionName(value.toByteArray());
       case ENCODED_REGION_NAME:
-        return Bytes.toString(value);
+        return value.toStringUtf8();
       default:
         throw new DoNotRetryIOException(
           "Unsupported region specifier type: " + type);
@@ -3211,6 +3211,19 @@ public final class ProtobufUtil {
     codedInput.checkLastTagWas(0);
   }
 
+  public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length)
+      throws IOException {
+    codedInput.resetSizeCounter();
+    int prevLimit = codedInput.setSizeLimit(length);
+
+    int limit = codedInput.pushLimit(length);
+    builder.mergeFrom(codedInput);
+    codedInput.popLimit(limit);
+
+    codedInput.checkLastTagWas(0);
+    codedInput.setSizeLimit(prevLimit);
+  }
+
   public static ReplicationLoadSink toReplicationLoadSink(
       ClusterStatusProtos.ReplicationLoadSink cls) {
     return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp());

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 44a742f..a6c8685 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -513,7 +513,7 @@ public class TestClientScanner {
       anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
 
     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
-      clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+      clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
       Iterator<Result> iter = scanner.iterator();
       while (iter.hasNext()) {
         iter.next();

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index 5b30482..bb580c8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -57,7 +57,7 @@ public class TestIPCUtil {
   public void before() {
     this.util = new IPCUtil(new Configuration());
   }
-  
+
   @Test
   public void testBuildCellBlock() throws IOException {
     doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
@@ -78,7 +78,7 @@ public class TestIPCUtil {
     CellScanner cellScanner = sized? getSizedCellScanner(cells):
       CellUtil.createCellScanner(Arrays.asList(cells).iterator());
     ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
-    cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+    cellScanner = util.createCellScanner(codec, compressor, bb);
     int i = 0;
     while (cellScanner.advance()) {
       i++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
index 1530ccd..8aee07b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java
@@ -21,6 +21,7 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
 
 /**
  * Not thread safe!
@@ -33,7 +34,7 @@ public class ByteBufferInputStream extends InputStream {
   private ByteBuffer buf;
 
   public ByteBufferInputStream(ByteBuffer buf) {
-      this.buf = buf;
+    this.buf = buf;
   }
 
   /**
@@ -42,6 +43,7 @@ public class ByteBufferInputStream extends InputStream {
    * because the end of the stream has been reached, the value <code>-1</code> is returned.
    * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
    */
+  @Override
   public int read() {
     if (this.buf.hasRemaining()) {
       return (this.buf.get() & 0xff);
@@ -58,7 +60,8 @@ public class ByteBufferInputStream extends InputStream {
    * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even
    *         1 byte can be read because the end of the stream has been reached.
    */
-  public int read(byte b[], int off, int len) {
+  @Override
+  public int read(byte[] b, int off, int len) {
     int avail = available();
     if (avail <= 0) {
       return -1;
@@ -71,7 +74,8 @@ public class ByteBufferInputStream extends InputStream {
       return 0;
     }
 
-    this.buf.get(b, off, len);
+    ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len);
+    this.buf.position(this.buf.position() + len); // we should advance the buffer position
     return len;
   }
 
@@ -82,6 +86,7 @@ public class ByteBufferInputStream extends InputStream {
    * @param n the number of bytes to be skipped.
    * @return the actual number of bytes skipped.
    */
+  @Override
   public long skip(long n) {
     long k = Math.min(n, available());
     if (k < 0) {
@@ -95,7 +100,8 @@ public class ByteBufferInputStream extends InputStream {
    * @return  the number of remaining bytes that can be read (or skipped
    *          over) from this input stream.
    */
+  @Override
   public int available() {
     return this.buf.remaining();
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index ef3d368..d822248 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -41,6 +41,8 @@ public final class ByteBufferUtils {
   private final static int VALUE_MASK = 0x7f;
   private final static int NEXT_BIT_SHIFT = 7;
   private final static int NEXT_BIT_MASK = 1 << 7;
+  private static final boolean UNSAFE_AVAIL = UnsafeAccess.isAvailable();
+  private static final boolean UNSAFE_UNALIGNED = UnsafeAccess.unaligned();
 
   private ByteBufferUtils() {
   }
@@ -509,4 +511,27 @@ public final class ByteBufferUtils {
     }
     return len1 - len2;
   }
+
+  /**
+   * Copies specified number of bytes from given offset of 'in' ByteBuffer to
+   * the array.
+   * @param out
+   * @param in
+   * @param sourceOffset
+   * @param destinationOffset
+   * @param length
+   */
+  public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset,
+      int destinationOffset, int length) {
+    if (in.hasArray()) {
+      System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length);
+    } else if (UNSAFE_AVAIL) {
+      UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
+    } else {
+      int oldPos = in.position();
+      in.position(sourceOffset);
+      in.get(out, destinationOffset, length);
+      in.position(oldPos);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index ef8fe43..5c2bc12 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -43,7 +43,7 @@ public class Threads {
   private static final Log LOG = LogFactory.getLog(Threads.class);
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
 
-  private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
+  public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
     new UncaughtExceptionHandler() {
     @Override
     public void uncaughtException(Thread t, Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
index 1a0b0e9..47a7f58 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
 import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -39,11 +41,15 @@ public final class UnsafeAccess {
   private static final Log LOG = LogFactory.getLog(UnsafeAccess.class);
 
   public static final Unsafe theUnsafe;
+  private static boolean unaligned = false;
 
   /** The offset to the first element in a byte array. */
   public static final int BYTE_ARRAY_BASE_OFFSET;
-  private static boolean unaligned = false;
 
+  // This number limits the number of bytes to copy per call to Unsafe's
+  // copyMemory method. A limit is imposed to allow for safepoint polling
+  // during a large copy
+  static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L;
   static {
     theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() {
       @Override
@@ -89,6 +95,89 @@ public final class UnsafeAccess {
     return unaligned;
   }
 
-  public static final boolean littleEndian = ByteOrder.nativeOrder()
-      .equals(ByteOrder.LITTLE_ENDIAN);
+
+  // APIs to copy data. This will be direct memory location copy and will be much faster
+  /**
+   * Copies the bytes from given array's offset to length part into the given buffer.
+   * @param src
+   * @param srcOffset
+   * @param dest
+   * @param destOffset
+   * @param length
+   */
+  public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) {
+    long destAddress = destOffset;
+    Object destBase = null;
+    if (dest.isDirect()) {
+      destAddress = destAddress + ((DirectBuffer) dest).address();
+    } else {
+      destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+      destBase = dest.array();
+    }
+    long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET;
+    unsafeCopy(src, srcAddress, destBase, destAddress, length);
+  }
+
+  private static void unsafeCopy(Object src, long srcAddr, Object dst, long destAddr, long len) {
+    while (len > 0) {
+      long size = (len > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : len;
+      theUnsafe.copyMemory(src, srcAddr, dst, destAddr, len);
+      len -= size;
+      srcAddr += size;
+      destAddr += size;
+    }
+  }
+
+  /**
+   * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the
+   * {@code dest} array.
+   *
+   * @param src
+   * @param srcOffset
+   * @param dest
+   * @param destOffset
+   * @param length
+   */
+  public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset,
+      int length) {
+    long srcAddress = srcOffset;
+    Object srcBase = null;
+    if (src.isDirect()) {
+      srcAddress = srcAddress + ((DirectBuffer) src).address();
+    } else {
+      srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset();
+      srcBase = src.array();
+    }
+    long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET;
+    unsafeCopy(srcBase, srcAddress, dest, destAddress, length);
+  }
+
+  /**
+   * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest}
+   * buffer.
+   *
+   * @param src
+   * @param srcOffset
+   * @param dest
+   * @param destOffset
+   * @param length
+   */
+  public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset,
+      int length) {
+    long srcAddress, destAddress;
+    Object srcBase = null, destBase = null;
+    if (src.isDirect()) {
+      srcAddress = srcOffset + ((DirectBuffer) src).address();
+    } else {
+      srcAddress = srcOffset +  src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET;
+      srcBase = src.array();
+    }
+    if (dest.isDirect()) {
+      destAddress = destOffset + ((DirectBuffer) dest).address();
+    } else {
+      destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset();
+      destBase = dest.array();
+    }
+    unsafeCopy(srcBase, srcAddress, destBase, destAddress, length);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 2859ea0..6bf623d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@@ -112,6 +114,7 @@ import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -485,10 +488,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return this.size;
     }
 
+    @Override
     public long getResponseCellSize() {
       return responseCellSize;
     }
 
+    @Override
     public void incrementResponseCellSize(long cellSize) {
       responseCellSize += cellSize;
     }
@@ -571,7 +576,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       readPool = Executors.newFixedThreadPool(readThreads,
         new ThreadFactoryBuilder().setNameFormat(
           "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
-          ",port=" + port).setDaemon(true).build());
+          ",port=" + port).setDaemon(true)
+        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
       for (int i = 0; i < readThreads; ++i) {
         Reader reader = new Reader();
         readers[i] = reader;
@@ -848,7 +854,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         throw ieo;
       } catch (Exception e) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": Caught exception while reading:" + e.getMessage());
+          LOG.debug(getName() + ": Caught exception while reading:", e);
         }
         count = -1; //so that the (count < 0) block is executed
       }
@@ -894,6 +900,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     Responder() throws IOException {
       this.setName("RpcServer.responder");
       this.setDaemon(true);
+      this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
       writeSelector = Selector.open(); // create a selector
     }
 
@@ -1311,17 +1318,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return authorizedUgi;
     }
 
-    private void saslReadAndProcess(byte[] saslToken) throws IOException,
+    private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
         InterruptedException {
       if (saslContextEstablished) {
         if (LOG.isTraceEnabled())
-          LOG.trace("Have read input token of size " + saslToken.length
+          LOG.trace("Have read input token of size " + saslToken.limit()
               + " for processing by saslServer.unwrap()");
 
         if (!useWrap) {
           processOneRpc(saslToken);
         } else {
-          byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length);
+          byte[] b = saslToken.array();
+          byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
           processUnwrappedData(plaintextData);
         }
       } else {
@@ -1370,10 +1378,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
             }
           }
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Have read input token of size " + saslToken.length
+            LOG.debug("Have read input token of size " + saslToken.limit()
                 + " for processing by saslServer.evaluateResponse()");
           }
-          replyToken = saslServer.evaluateResponse(saslToken);
+          replyToken = saslServer.evaluateResponse(saslToken.array());
         } catch (IOException e) {
           IOException sendToClient = e;
           Throwable cause = e;
@@ -1569,6 +1577,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           throw new IllegalArgumentException("Unexpected data length "
               + dataLength + "!! from " + getHostAddress());
         }
+
+       // TODO: check dataLength against some limit so that the client cannot OOM the server
         data = ByteBuffer.allocate(dataLength);
 
         // Increment the rpc count. This counter will be decreased when we write
@@ -1598,9 +1608,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
 
         if (useSasl) {
-          saslReadAndProcess(data.array());
+          saslReadAndProcess(data);
         } else {
-          processOneRpc(data.array());
+          processOneRpc(data);
         }
 
       } finally {
@@ -1629,8 +1639,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     }
 
     // Reads the connection header following version
-    private void processConnectionHeader(byte[] buf) throws IOException {
-      this.connectionHeader = ConnectionHeader.parseFrom(buf);
+    private void processConnectionHeader(ByteBuffer buf) throws IOException {
+      this.connectionHeader = ConnectionHeader.parseFrom(
+        new ByteBufferInputStream(buf));
       String serviceName = connectionHeader.getServiceName();
       if (serviceName == null) throw new EmptyServiceNameException();
       this.service = getService(services, serviceName);
@@ -1744,13 +1755,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         if (unwrappedData.remaining() == 0) {
           unwrappedDataLengthBuffer.clear();
           unwrappedData.flip();
-          processOneRpc(unwrappedData.array());
+          processOneRpc(unwrappedData);
           unwrappedData = null;
         }
       }
     }
 
-    private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
+
+    private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
       if (connectionHeaderRead) {
         processRequest(buf);
       } else {
@@ -1772,16 +1784,16 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      * @throws IOException
      * @throws InterruptedException
      */
-    protected void processRequest(byte[] buf) throws IOException, InterruptedException {
-      long totalRequestSize = buf.length;
+    protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
+      long totalRequestSize = buf.limit();
       int offset = 0;
       // Here we read in the header.  We avoid having pb
       // do its default 4k allocation for CodedInputStream.  We force it to use backing array.
-      CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length);
+      CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
       int headerSize = cis.readRawVarint32();
       offset = cis.getTotalBytesRead();
       Message.Builder builder = RequestHeader.newBuilder();
-      ProtobufUtil.mergeFrom(builder, buf, offset, headerSize);
+      ProtobufUtil.mergeFrom(builder, cis, headerSize);
       RequestHeader header = (RequestHeader) builder.build();
       offset += headerSize;
       int id = header.getCallId();
@@ -1812,19 +1824,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
           if (md == null) throw new UnsupportedOperationException(header.getMethodName());
           builder = this.service.getRequestPrototype(md).newBuilderForType();
-          // To read the varint, I need an inputstream; might as well be a CIS.
-          cis = CodedInputStream.newInstance(buf, offset, buf.length);
+          cis.resetSizeCounter();
           int paramSize = cis.readRawVarint32();
           offset += cis.getTotalBytesRead();
           if (builder != null) {
-            ProtobufUtil.mergeFrom(builder, buf, offset, paramSize);
+            ProtobufUtil.mergeFrom(builder, cis, paramSize);
             param = builder.build();
           }
           offset += paramSize;
         }
         if (header.hasCellBlockMeta()) {
-          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
-            buf, offset, buf.length);
+          buf.position(offset);
+          cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
         }
       } catch (Throwable t) {
         InetSocketAddress address = getListenerAddress();

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
index cfdbce0..8438378 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -217,10 +216,10 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
     if (param == null) {
       return HConstants.NORMAL_QOS;
     }
-    if (param instanceof MultiRequest) {
-      // The multi call has its priority set in the header.  All calls should work this way but
-      // only this one has been converted so far.  No priority == NORMAL_QOS.
-      return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
+
+    // Trust the client-set priorities if set
+    if (header.hasPriority()) {
+      return header.getPriority();
     }
 
     String cls = param.getClass().getName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3b3f030..7605fd0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5171,6 +5171,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
    *                 lock is requested
    */
+  @Override
   public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
     // Make sure the row is inside of this region before getting the lock for it.
     checkRow(row, "row lock");
@@ -5578,8 +5579,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // Here we separate all scanners into two lists - scanner that provide data required
       // by the filter to operate (scanners list) and all others (joinedScanners list).
-      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
-      List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
+      List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
+      List<KeyValueScanner> joinedScanners
+        = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
       if (additionalScanners != null) {
         scanners.addAll(additionalScanners);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73d67788/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index f136071..f95446f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1050,8 +1050,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   Region getRegion(
       final RegionSpecifier regionSpecifier) throws IOException {
-    return regionServer.getRegionByEncodedName(regionSpecifier.getValue().toByteArray(),
-        ProtobufUtil.getRegionEncodedName(regionSpecifier));
+    ByteString value = regionSpecifier.getValue();
+    RegionSpecifierType type = regionSpecifier.getType();
+    switch (type) {
+      case REGION_NAME:
+        byte[] regionName = value.toByteArray();
+        String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
+        return regionServer.getRegionByEncodedName(regionName, encodedRegionName);
+      case ENCODED_REGION_NAME:
+        return regionServer.getRegionByEncodedName(value.toStringUtf8());
+      default:
+        throw new DoNotRetryIOException(
+          "Unsupported region specifier type: " + type);
+    }
   }
 
   @VisibleForTesting