You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/09 07:06:24 UTC

hbase git commit: HBASE-18009 Move RpcServer.Call to a separated file

Repository: hbase
Updated Branches:
  refs/heads/master 959deb0e5 -> 51d4c68b7


HBASE-18009 Move RpcServer.Call to a separated file


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/51d4c68b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/51d4c68b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/51d4c68b

Branch: refs/heads/master
Commit: 51d4c68b7cce43af1190f9195bfb08963375bc27
Parents: 959deb0
Author: zhangduo <zh...@apache.org>
Authored: Mon May 8 20:36:33 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 9 14:56:03 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   4 +-
 .../apache/hadoop/hbase/ipc/NettyRpcServer.java |  91 +---
 .../hadoop/hbase/ipc/NettyServerCall.java       |  67 +++
 .../org/apache/hadoop/hbase/ipc/RpcCall.java    |   5 -
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 498 +-----------------
 .../org/apache/hadoop/hbase/ipc/ServerCall.java | 527 +++++++++++++++++++
 .../hadoop/hbase/ipc/SimpleRpcServer.java       | 122 ++---
 .../hadoop/hbase/ipc/SimpleServerCall.java      |  79 +++
 .../apache/hadoop/hbase/ipc/TestCallRunner.java |   3 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  60 +--
 .../hadoop/hbase/security/TestSecureIPC.java    |   8 +-
 11 files changed, 774 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 0aabc10..f16fc50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -75,8 +75,8 @@ public class CallRunner {
    * @deprecated As of release 2.0, this will be removed in HBase 3.0
    */
   @Deprecated
-  public RpcServer.Call getCall() {
-    return (RpcServer.Call) call;
+  public ServerCall getCall() {
+    return (ServerCall) call;
   }
 
   public void setStatus(MonitoredRPCHandler status) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index be55378..c18b894 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
@@ -203,13 +202,14 @@ public class NettyRpcServer extends RpcServer {
         this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
       }
       this.remotePort = inetSocketAddress.getPort();
-      this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
-          0, null, null, 0, null);
-      this.setConnectionHeaderResponseCall = new Call(
-          CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
-          this, 0, null, null, 0, null);
-      this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
-          null, null, null, this, 0, null, null, 0, null);
+      this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+          null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
+      this.setConnectionHeaderResponseCall =
+          new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
+              0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
+      this.authFailedCall =
+          new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
+              null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
     }
 
     void readPreamble(ByteBuf buffer) throws IOException {
@@ -243,7 +243,7 @@ public class NettyRpcServer extends RpcServer {
           AccessDeniedException ae = new AccessDeniedException(
               "Authentication is required");
           setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
-          ((Call) authFailedCall)
+          ((NettyServerCall) authFailedCall)
               .sendResponseIfReady(ChannelFutureListener.CLOSE);
           return;
         }
@@ -269,8 +269,8 @@ public class NettyRpcServer extends RpcServer {
 
     private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
       LOG.warn(msg);
-      Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
-          null, null, 0, null);
+      NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
+          null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
       setupResponse(null, fakeCall, e, msg);
       // closes out the connection.
       fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
@@ -336,59 +336,17 @@ public class NettyRpcServer extends RpcServer {
     }
 
     @Override
-    public RpcServer.Call createCall(int id, final BlockingService service,
+    public ServerCall createCall(int id, final BlockingService service,
         final MethodDescriptor md, RequestHeader header, Message param,
         CellScanner cellScanner, RpcServer.Connection connection, long size,
         TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
         CallCleanup reqCleanup) {
-      return new Call(id, service, md, header, param, cellScanner, connection,
-          size, tinfo, remoteAddress, timeout, reqCleanup);
+      return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
+          tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
+          reqCleanup);
     }
   }
 
-  /**
-   * Datastructure that holds all necessary to a method invocation and then afterward, carries the
-   * result.
-   */
-  @InterfaceStability.Evolving
-  public class Call extends RpcServer.Call {
-
-    Call(int id, final BlockingService service, final MethodDescriptor md,
-        RequestHeader header, Message param, CellScanner cellScanner,
-        RpcServer.Connection connection, long size, TraceInfo tinfo,
-        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
-      super(id, service, md, header, param, cellScanner,
-          connection, size, tinfo, remoteAddress, timeout, reqCleanup);
-    }
-
-    @Override
-    public long disconnectSince() {
-      if (!getConnection().isConnectionOpen()) {
-        return System.currentTimeMillis() - timestamp;
-      } else {
-        return -1L;
-      }
-    }
-
-    NettyConnection getConnection() {
-      return (NettyConnection) this.connection;
-    }
-
-    /**
-     * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
-     * respond to client. This is called by the RPC code in the context of the Handler thread.
-     */
-    @Override
-    public synchronized void sendResponseIfReady() throws IOException {
-      getConnection().channel.writeAndFlush(this);
-    }
-
-    public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
-      getConnection().channel.writeAndFlush(this).addListener(listener);
-    }
-
-  }
-
   private class Initializer extends ChannelInitializer<SocketChannel> {
 
     final int maxRequestSize;
@@ -483,7 +441,7 @@ public class NettyRpcServer extends RpcServer {
 
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
-      final Call call = (Call) msg;
+      final NettyServerCall call = (NettyServerCall) msg;
       ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
       ctx.write(response, promise).addListener(new CallWriteListener(call));
     }
@@ -492,9 +450,9 @@ public class NettyRpcServer extends RpcServer {
 
   private class CallWriteListener implements ChannelFutureListener {
 
-    private Call call;
+    private NettyServerCall call;
 
-    CallWriteListener(Call call) {
+    CallWriteListener(NettyServerCall call) {
       this.call = call;
     }
 
@@ -527,14 +485,11 @@ public class NettyRpcServer extends RpcServer {
   }
 
   @Override
-  public Pair<Message, CellScanner> call(BlockingService service,
-      MethodDescriptor md, Message param, CellScanner cellScanner,
-      long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
-      throws IOException {
-    Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
-        -1, null, null, timeout, null);
-    fakeCall.setReceiveTime(receiveTime);
+  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+      long startTime, int timeout) throws IOException {
+    NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
+        -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
     return call(fakeCall, status);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
new file mode 100644
index 0000000..a3f23dd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelFutureListener;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+@InterfaceAudience.Private
+class NettyServerCall extends ServerCall {
+
+  NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
+      Message param, CellScanner cellScanner, RpcServer.Connection connection, long size,
+      TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
+      ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+    super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
+        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+  }
+
+  NettyConnection getConnection() {
+    return (NettyConnection) this.connection;
+  }
+
+  /**
+   * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
+   * respond to client. This is called by the RPC code in the context of the Handler thread.
+   */
+  @Override
+  public synchronized void sendResponseIfReady() throws IOException {
+    getConnection().channel.writeAndFlush(this);
+  }
+
+  public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
+    getConnection().channel.writeAndFlush(this).addListener(listener);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
index 239ea9e..fc86594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -65,11 +65,6 @@ public interface RpcCall extends RpcCallContext {
   long getReceiveTime();
 
   /**
-   * Set the timestamp when the call is constructed.
-   */
-  void setReceiveTime(long receiveTime);
-
-  /**
    * @return The time when the call starts to be executed.
    */
   long getStartTime();

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ebae1fb..bbc329c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -62,9 +62,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
 import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
@@ -88,7 +86,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -98,13 +95,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.BytesWritable;
@@ -120,7 +113,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.TraceInfo;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -265,475 +257,6 @@ public abstract class RpcServer implements RpcServerInterface,
    */
   private RSRpcServices rsRpcServices;
 
-  /**
-   * Datastructure that holds all necessary to a method invocation and then afterward, carries
-   * the result.
-   */
-  @InterfaceStability.Evolving
-  @InterfaceAudience.Private
-  public abstract class Call implements RpcCall {
-    protected int id;                             // the client's call id
-    protected BlockingService service;
-    protected MethodDescriptor md;
-    protected RequestHeader header;
-    protected Message param;                      // the parameter passed
-    // Optional cell data passed outside of protobufs.
-    protected CellScanner cellScanner;
-    protected Connection connection;              // connection to client
-    protected long timestamp;      // the time received when response is null
-                                   // the time served when response is not null
-    protected int timeout;
-    protected long startTime;
-    protected long deadline;// the deadline to handle this call, if exceed we can drop it.
-
-    /**
-     * Chain of buffers to send as response.
-     */
-    protected BufferChain response;
-
-    protected long size;                          // size of current call
-    protected boolean isError;
-    protected TraceInfo tinfo;
-    protected ByteBufferListOutputStream cellBlockStream = null;
-    protected CallCleanup reqCleanup = null;
-
-    protected User user;
-    protected InetAddress remoteAddress;
-    protected RpcCallback rpcCallback;
-
-    private long responseCellSize = 0;
-    private long responseBlockSize = 0;
-    // cumulative size of serialized exceptions
-    private long exceptionSize = 0;
-    private boolean retryImmediatelySupported;
-
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-        justification="Can't figure why this complaint is happening... see below")
-    Call(int id, final BlockingService service, final MethodDescriptor md,
-        RequestHeader header, Message param, CellScanner cellScanner,
-        Connection connection, long size, TraceInfo tinfo,
-        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
-      this.id = id;
-      this.service = service;
-      this.md = md;
-      this.header = header;
-      this.param = param;
-      this.cellScanner = cellScanner;
-      this.connection = connection;
-      this.timestamp = System.currentTimeMillis();
-      this.response = null;
-      this.isError = false;
-      this.size = size;
-      this.tinfo = tinfo;
-      this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
-      this.remoteAddress = remoteAddress;
-      this.retryImmediatelySupported =
-          connection == null? null: connection.retryImmediatelySupported;
-      this.timeout = timeout;
-      this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
-      this.reqCleanup = reqCleanup;
-    }
-
-    /**
-     * Call is done. Execution happened and we returned results to client. It is
-     * now safe to cleanup.
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
-        justification = "Presume the lock on processing request held by caller is protection enough")
-    void done() {
-      if (this.cellBlockStream != null) {
-        // This will return back the BBs which we got from pool.
-        this.cellBlockStream.releaseResources();
-        this.cellBlockStream = null;
-      }
-      // If the call was run successfuly, we might have already returned the BB
-      // back to pool. No worries..Then inputCellBlock will be null
-      cleanup();
-    }
-
-    @Override
-    public void cleanup() {
-      if (this.reqCleanup != null) {
-        this.reqCleanup.run();
-        this.reqCleanup = null;
-      }
-    }
-
-    @Override
-    public String toString() {
-      return toShortString() + " param: " +
-        (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
-        " connection: " + connection.toString();
-    }
-
-    @Override
-    public RequestHeader getHeader() {
-      return this.header;
-    }
-
-    @Override
-    public int getPriority() {
-      return this.header.getPriority();
-    }
-
-    /*
-     * Short string representation without param info because param itself could be huge depends on
-     * the payload of a command
-     */
-    @Override
-    public String toShortString() {
-      String serviceName = this.connection.service != null ?
-          this.connection.service.getDescriptorForType().getName() : "null";
-      return "callId: " + this.id + " service: " + serviceName +
-          " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
-          " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
-          " connection: " + connection.toString() +
-          " deadline: " + deadline;
-    }
-
-    protected synchronized void setSaslTokenResponse(ByteBuffer response) {
-      ByteBuffer[] responseBufs = new ByteBuffer[1];
-      responseBufs[0] = response;
-      this.response = new BufferChain(responseBufs);
-    }
-
-    protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
-      ByteBuffer[] responseBufs = new ByteBuffer[1];
-      responseBufs[0] = response;
-      this.response = new BufferChain(responseBufs);
-    }
-
-    @Override
-    public synchronized void setResponse(Message m, final CellScanner cells,
-        Throwable t, String errorMsg) {
-      if (this.isError) return;
-      if (t != null) this.isError = true;
-      BufferChain bc = null;
-      try {
-        ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
-        // Call id.
-        headerBuilder.setCallId(this.id);
-        if (t != null) {
-          setExceptionResponse(t, errorMsg, headerBuilder);
-        }
-        // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
-        // reservoir when finished. This is hacky and the hack is not contained but benefits are
-        // high when we can avoid a big buffer allocation on each rpc.
-        List<ByteBuffer> cellBlock = null;
-        int cellBlockSize = 0;
-        if (reservoir != null) {
-          this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
-              this.connection.compressionCodec, cells, reservoir);
-          if (this.cellBlockStream != null) {
-            cellBlock = this.cellBlockStream.getByteBuffers();
-            cellBlockSize = this.cellBlockStream.size();
-          }
-        } else {
-          ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
-              this.connection.compressionCodec, cells);
-          if (b != null) {
-            cellBlockSize = b.remaining();
-            cellBlock = new ArrayList<>(1);
-            cellBlock.add(b);
-          }
-        }
-
-        if (cellBlockSize > 0) {
-          CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
-          // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
-          cellBlockBuilder.setLength(cellBlockSize);
-          headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
-        }
-        Message header = headerBuilder.build();
-        ByteBuffer headerBuf =
-            createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
-        ByteBuffer[] responseBufs = null;
-        int cellBlockBufferSize = 0;
-        if (cellBlock != null) {
-          cellBlockBufferSize = cellBlock.size();
-          responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
-        } else {
-          responseBufs = new ByteBuffer[1];
-        }
-        responseBufs[0] = headerBuf;
-        if (cellBlock != null) {
-          for (int i = 0; i < cellBlockBufferSize; i++) {
-            responseBufs[i + 1] = cellBlock.get(i);
-          }
-        }
-        bc = new BufferChain(responseBufs);
-        if (connection.useWrap) {
-          bc = wrapWithSasl(bc);
-        }
-      } catch (IOException e) {
-        LOG.warn("Exception while creating response " + e);
-      }
-      this.response = bc;
-      // Once a response message is created and set to this.response, this Call can be treated as
-      // done. The Responder thread will do the n/w write of this message back to client.
-      if (this.rpcCallback != null) {
-        try {
-          this.rpcCallback.run();
-        } catch (Exception e) {
-          // Don't allow any exception here to kill this handler thread.
-          LOG.warn("Exception while running the Rpc Callback.", e);
-        }
-      }
-    }
-
-    protected void setExceptionResponse(Throwable t, String errorMsg,
-        ResponseHeader.Builder headerBuilder) {
-      ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
-      exceptionBuilder.setExceptionClassName(t.getClass().getName());
-      exceptionBuilder.setStackTrace(errorMsg);
-      exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
-      if (t instanceof RegionMovedException) {
-        // Special casing for this exception.  This is only one carrying a payload.
-        // Do this instead of build a generic system for allowing exceptions carry
-        // any kind of payload.
-        RegionMovedException rme = (RegionMovedException)t;
-        exceptionBuilder.setHostname(rme.getHostname());
-        exceptionBuilder.setPort(rme.getPort());
-      }
-      // Set the exception as the result of the method invocation.
-      headerBuilder.setException(exceptionBuilder.build());
-    }
-
-    protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
-        int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
-      // Organize the response as a set of bytebuffers rather than collect it all together inside
-      // one big byte array; save on allocations.
-      // for writing the header, we check if there is available space in the buffers
-      // created for the cellblock itself. If there is space for the header, we reuse
-      // the last buffer in the cellblock. This applies to the cellblock created from the
-      // pool or even the onheap cellblock buffer in case there is no pool enabled.
-      // Possible reuse would avoid creating a temporary array for storing the header every time.
-      ByteBuffer possiblePBBuf =
-          (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
-      int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
-          resultVintSize = 0;
-      if (header != null) {
-        headerSerializedSize = header.getSerializedSize();
-        headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
-      }
-      if (result != null) {
-        resultSerializedSize = result.getSerializedSize();
-        resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
-      }
-      // calculate the total size
-      int totalSize = headerSerializedSize + headerVintSize
-          + (resultSerializedSize + resultVintSize)
-          + cellBlockSize;
-      int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
-          + resultVintSize + Bytes.SIZEOF_INT;
-      // Only if the last buffer has enough space for header use it. Else allocate
-      // a new buffer. Assume they are all flipped
-      if (possiblePBBuf != null
-          && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
-        // duplicate the buffer. This is where the header is going to be written
-        ByteBuffer pbBuf = possiblePBBuf.duplicate();
-        // get the current limit
-        int limit = pbBuf.limit();
-        // Position such that we write the header to the end of the buffer
-        pbBuf.position(limit);
-        // limit to the header size
-        pbBuf.limit(totalPBSize + limit);
-        // mark the current position
-        pbBuf.mark();
-        writeToCOS(result, header, totalSize, pbBuf);
-        // reset the buffer back to old position
-        pbBuf.reset();
-        return pbBuf;
-      } else {
-        return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
-      }
-    }
-
-    private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
-        throws IOException {
-      ByteBufferUtils.putInt(pbBuf, totalSize);
-      // create COS that works on BB
-      CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
-      if (header != null) {
-        cos.writeMessageNoTag(header);
-      }
-      if (result != null) {
-        cos.writeMessageNoTag(result);
-      }
-      cos.flush();
-      cos.checkNoSpaceLeft();
-    }
-
-    private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
-        int totalSize, int totalPBSize) throws IOException {
-      ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
-      writeToCOS(result, header, totalSize, pbBuf);
-      pbBuf.flip();
-      return pbBuf;
-    }
-
-    protected BufferChain wrapWithSasl(BufferChain bc)
-        throws IOException {
-      if (!this.connection.useSasl) return bc;
-      // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
-      // THIS IS A BIG UGLY COPY.
-      byte [] responseBytes = bc.getBytes();
-      byte [] token;
-      // synchronization may be needed since there can be multiple Handler
-      // threads using saslServer or Crypto AES to wrap responses.
-      if (connection.useCryptoAesWrap) {
-        // wrap with Crypto AES
-        synchronized (connection.cryptoAES) {
-          token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
-        }
-      } else {
-        synchronized (connection.saslServer) {
-          token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
-        }
-      }
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Adding saslServer wrapped token of size " + token.length
-            + " as call response.");
-      }
-
-      ByteBuffer[] responseBufs = new ByteBuffer[2];
-      responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
-      responseBufs[1] = ByteBuffer.wrap(token);
-      return new BufferChain(responseBufs);
-    }
-
-    @Override
-    public boolean isClientCellBlockSupported() {
-      return this.connection != null && this.connection.codec != null;
-    }
-
-    @Override
-    public long getResponseCellSize() {
-      return responseCellSize;
-    }
-
-    @Override
-    public void incrementResponseCellSize(long cellSize) {
-      responseCellSize += cellSize;
-    }
-
-    @Override
-    public long getResponseBlockSize() {
-      return responseBlockSize;
-    }
-
-    @Override
-    public void incrementResponseBlockSize(long blockSize) {
-      responseBlockSize += blockSize;
-    }
-
-    @Override
-    public long getResponseExceptionSize() {
-      return exceptionSize;
-    }
-    @Override
-    public void incrementResponseExceptionSize(long exSize) {
-      exceptionSize += exSize;
-    }
-
-    @Override
-    public long getSize() {
-      return this.size;
-    }
-
-    @Override
-    public long getDeadline() {
-      return deadline;
-    }
-
-    @Override
-    public User getRequestUser() {
-      return user;
-    }
-
-    @Override
-    public String getRequestUserName() {
-      User user = getRequestUser();
-      return user == null? null: user.getShortName();
-    }
-
-    @Override
-    public InetAddress getRemoteAddress() {
-      return remoteAddress;
-    }
-
-    @Override
-    public VersionInfo getClientVersionInfo() {
-      return connection.getVersionInfo();
-    }
-
-    @Override
-    public synchronized void setCallBack(RpcCallback callback) {
-      this.rpcCallback = callback;
-    }
-
-    @Override
-    public boolean isRetryImmediatelySupported() {
-      return retryImmediatelySupported;
-    }
-
-    @Override
-    public BlockingService getService() {
-      return service;
-    }
-
-    @Override
-    public MethodDescriptor getMethod() {
-      return md;
-    }
-
-    @Override
-    public Message getParam() {
-      return param;
-    }
-
-    @Override
-    public CellScanner getCellScanner() {
-      return cellScanner;
-    }
-
-    @Override
-    public long getReceiveTime() {
-      return timestamp;
-    }
-
-    @Override
-    public void setReceiveTime(long t) {
-      this.timestamp = t;
-    }
-
-    @Override
-    public long getStartTime() {
-      return startTime;
-    }
-
-    @Override
-    public void setStartTime(long t) {
-      this.startTime = t;
-    }
-
-    @Override
-    public int getTimeout() {
-      return timeout;
-    }
-
-    @Override
-    public int getRemotePort() {
-      return connection.getRemotePort();
-    }
-
-    @Override
-    public TraceInfo getTraceInfo() {
-      return tinfo;
-    }
-
-  }
-
   @FunctionalInterface
   protected static interface CallCleanup {
     void run();
@@ -781,15 +304,15 @@ public abstract class RpcServer implements RpcServerInterface,
     protected boolean useCryptoAesWrap = false;
     // Fake 'call' for failed authorization response
     protected static final int AUTHORIZATION_FAILED_CALLID = -1;
-    protected Call authFailedCall;
+    protected ServerCall authFailedCall;
     protected ByteArrayOutputStream authFailedResponse =
         new ByteArrayOutputStream();
     // Fake 'call' for SASL context setup
     protected static final int SASL_CALLID = -33;
-    protected Call saslCall;
+    protected ServerCall saslCall;
     // Fake 'call' for connection header response
     protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
-    protected Call setConnectionHeaderResponseCall;
+    protected ServerCall setConnectionHeaderResponseCall;
 
     // was authentication allowed with a fallback to simple auth
     protected boolean authenticatedWithFallback;
@@ -1366,7 +889,7 @@ public abstract class RpcServer implements RpcServerInterface,
       // This is a bit late to be doing this check - we have already read in the
       // total request.
       if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
-        final RpcServer.Call callTooBig = createCall(id, this.service, null,
+        final ServerCall callTooBig = createCall(id, this.service, null,
             null, null, null, this, totalRequestSize, null, null, 0,
             this.callCleanup);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@@ -1430,7 +953,7 @@ public abstract class RpcServer implements RpcServerInterface,
           t = new DoNotRetryIOException(t);
         }
 
-        final RpcServer.Call readParamsFailedCall = createCall(id,
+        final ServerCall readParamsFailedCall = createCall(id,
             this.service, null, null, null, null, this, totalRequestSize, null,
             null, 0, this.callCleanup);
         ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@@ -1447,7 +970,7 @@ public abstract class RpcServer implements RpcServerInterface,
       if (header.hasTimeout() && header.getTimeout() > 0) {
         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
       }
-      RpcServer.Call call = createCall(id, this.service, md, header, param,
+      ServerCall call = createCall(id, this.service, md, header, param,
           cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
           this.callCleanup);
 
@@ -1465,7 +988,7 @@ public abstract class RpcServer implements RpcServerInterface,
 
     public abstract boolean isConnectionOpen();
 
-    public abstract Call createCall(int id, final BlockingService service,
+    public abstract ServerCall createCall(int id, final BlockingService service,
         final MethodDescriptor md, RequestHeader header, Message param,
         CellScanner cellScanner, Connection connection, long size,
         TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
@@ -1594,14 +1117,13 @@ public abstract class RpcServer implements RpcServerInterface,
 
   /**
    * Setup response for the RPC Call.
-   *
    * @param response buffer to serialize the response into
-   * @param call {@link Call} to which we are setting up the response
+   * @param call {@link ServerCall} to which we are setting up the response
    * @param error error message, if the call failed
    * @throws IOException
    */
-  protected void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
-  throws IOException {
+  protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t,
+      String error) throws IOException {
     if (response != null) response.reset();
     call.setResponse(null, null, t, error);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
new file mode 100644
index 0000000..9294839
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries
+ * the result.
+ */
+@InterfaceAudience.Private
+abstract class ServerCall implements RpcCall {
+
+  protected final int id;                             // the client's call id
+  protected final BlockingService service;
+  protected final MethodDescriptor md;
+  protected final RequestHeader header;
+  protected Message param;                      // the parameter passed
+  // Optional cell data passed outside of protobufs.
+  protected final CellScanner cellScanner;
+  protected final Connection connection;              // connection to client
+  protected final long receiveTime;      // the time received when response is null
+                                 // the time served when response is not null
+  protected final int timeout;
+  protected long startTime;
+  protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
+
+  protected final ByteBufferPool reservoir;
+
+  protected final CellBlockBuilder cellBlockBuilder;
+
+  /**
+   * Chain of buffers to send as response.
+   */
+  protected BufferChain response;
+
+  protected final long size;                          // size of current call
+  protected boolean isError;
+  protected final TraceInfo tinfo;
+  protected ByteBufferListOutputStream cellBlockStream = null;
+  protected CallCleanup reqCleanup = null;
+
+  protected User user;
+  protected final InetAddress remoteAddress;
+  protected RpcCallback rpcCallback;
+
+  private long responseCellSize = 0;
+  private long responseBlockSize = 0;
+  // cumulative size of serialized exceptions
+  private long exceptionSize = 0;
+  private final boolean retryImmediatelySupported;
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
+      justification="Can't figure why this complaint is happening... see below")
+  ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
+      Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo,
+      InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+      CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+    this.id = id;
+    this.service = service;
+    this.md = md;
+    this.header = header;
+    this.param = param;
+    this.cellScanner = cellScanner;
+    this.connection = connection;
+    this.receiveTime = receiveTime;
+    this.response = null;
+    this.isError = false;
+    this.size = size;
+    this.tinfo = tinfo;
+    this.user = connection == null ? null : connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
+    this.remoteAddress = remoteAddress;
+    this.retryImmediatelySupported =
+        connection == null ? false : connection.retryImmediatelySupported;
+    this.timeout = timeout;
+    this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
+    this.reservoir = reservoir;
+    this.cellBlockBuilder = cellBlockBuilder;
+    this.reqCleanup = reqCleanup;
+  }
+
+  /**
+   * Call is done. Execution happened and we returned results to client. It is
+   * now safe to cleanup.
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "Presume the lock on processing request held by caller is protection enough")
+  void done() {
+    if (this.cellBlockStream != null) {
+      // This will return back the BBs which we got from pool.
+      this.cellBlockStream.releaseResources();
+      this.cellBlockStream = null;
+    }
+    // If the call was run successfuly, we might have already returned the BB
+    // back to pool. No worries..Then inputCellBlock will be null
+    cleanup();
+  }
+
+  @Override
+  public void cleanup() {
+    if (this.reqCleanup != null) {
+      this.reqCleanup.run();
+      this.reqCleanup = null;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toShortString() + " param: " +
+      (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
+      " connection: " + connection.toString();
+  }
+
+  @Override
+  public RequestHeader getHeader() {
+    return this.header;
+  }
+
+  @Override
+  public int getPriority() {
+    return this.header.getPriority();
+  }
+
+  /*
+   * Short string representation without param info because param itself could be huge depends on
+   * the payload of a command
+   */
+  @Override
+  public String toShortString() {
+    String serviceName = this.connection.service != null ?
+        this.connection.service.getDescriptorForType().getName() : "null";
+    return "callId: " + this.id + " service: " + serviceName +
+        " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
+        " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
+        " connection: " + connection.toString() +
+        " deadline: " + deadline;
+  }
+
+  protected synchronized void setSaslTokenResponse(ByteBuffer response) {
+    ByteBuffer[] responseBufs = new ByteBuffer[1];
+    responseBufs[0] = response;
+    this.response = new BufferChain(responseBufs);
+  }
+
+  protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
+    ByteBuffer[] responseBufs = new ByteBuffer[1];
+    responseBufs[0] = response;
+    this.response = new BufferChain(responseBufs);
+  }
+
+  @Override
+  public synchronized void setResponse(Message m, final CellScanner cells,
+      Throwable t, String errorMsg) {
+    if (this.isError) return;
+    if (t != null) this.isError = true;
+    BufferChain bc = null;
+    try {
+      ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
+      // Call id.
+      headerBuilder.setCallId(this.id);
+      if (t != null) {
+        setExceptionResponse(t, errorMsg, headerBuilder);
+      }
+      // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
+      // reservoir when finished. This is hacky and the hack is not contained but benefits are
+      // high when we can avoid a big buffer allocation on each rpc.
+      List<ByteBuffer> cellBlock = null;
+      int cellBlockSize = 0;
+      if (this.reservoir != null) {
+        this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
+          this.connection.compressionCodec, cells, this.reservoir);
+        if (this.cellBlockStream != null) {
+          cellBlock = this.cellBlockStream.getByteBuffers();
+          cellBlockSize = this.cellBlockStream.size();
+        }
+      } else {
+        ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
+          this.connection.compressionCodec, cells);
+        if (b != null) {
+          cellBlockSize = b.remaining();
+          cellBlock = new ArrayList<>(1);
+          cellBlock.add(b);
+        }
+      }
+
+      if (cellBlockSize > 0) {
+        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+        // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
+        cellBlockBuilder.setLength(cellBlockSize);
+        headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
+      }
+      Message header = headerBuilder.build();
+      ByteBuffer headerBuf =
+          createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
+      ByteBuffer[] responseBufs = null;
+      int cellBlockBufferSize = 0;
+      if (cellBlock != null) {
+        cellBlockBufferSize = cellBlock.size();
+        responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
+      } else {
+        responseBufs = new ByteBuffer[1];
+      }
+      responseBufs[0] = headerBuf;
+      if (cellBlock != null) {
+        for (int i = 0; i < cellBlockBufferSize; i++) {
+          responseBufs[i + 1] = cellBlock.get(i);
+        }
+      }
+      bc = new BufferChain(responseBufs);
+      if (connection.useWrap) {
+        bc = wrapWithSasl(bc);
+      }
+    } catch (IOException e) {
+      RpcServer.LOG.warn("Exception while creating response " + e);
+    }
+    this.response = bc;
+    // Once a response message is created and set to this.response, this Call can be treated as
+    // done. The Responder thread will do the n/w write of this message back to client.
+    if (this.rpcCallback != null) {
+      try {
+        this.rpcCallback.run();
+      } catch (Exception e) {
+        // Don't allow any exception here to kill this handler thread.
+        RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
+      }
+    }
+  }
+
+  protected void setExceptionResponse(Throwable t, String errorMsg,
+      ResponseHeader.Builder headerBuilder) {
+    ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+    exceptionBuilder.setExceptionClassName(t.getClass().getName());
+    exceptionBuilder.setStackTrace(errorMsg);
+    exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+    if (t instanceof RegionMovedException) {
+      // Special casing for this exception.  This is only one carrying a payload.
+      // Do this instead of build a generic system for allowing exceptions carry
+      // any kind of payload.
+      RegionMovedException rme = (RegionMovedException)t;
+      exceptionBuilder.setHostname(rme.getHostname());
+      exceptionBuilder.setPort(rme.getPort());
+    }
+    // Set the exception as the result of the method invocation.
+    headerBuilder.setException(exceptionBuilder.build());
+  }
+
+  protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+      int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
+    // Organize the response as a set of bytebuffers rather than collect it all together inside
+    // one big byte array; save on allocations.
+    // for writing the header, we check if there is available space in the buffers
+    // created for the cellblock itself. If there is space for the header, we reuse
+    // the last buffer in the cellblock. This applies to the cellblock created from the
+    // pool or even the onheap cellblock buffer in case there is no pool enabled.
+    // Possible reuse would avoid creating a temporary array for storing the header every time.
+    ByteBuffer possiblePBBuf =
+        (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
+    int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
+        resultVintSize = 0;
+    if (header != null) {
+      headerSerializedSize = header.getSerializedSize();
+      headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
+    }
+    if (result != null) {
+      resultSerializedSize = result.getSerializedSize();
+      resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
+    }
+    // calculate the total size
+    int totalSize = headerSerializedSize + headerVintSize
+        + (resultSerializedSize + resultVintSize)
+        + cellBlockSize;
+    int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+        + resultVintSize + Bytes.SIZEOF_INT;
+    // Only if the last buffer has enough space for header use it. Else allocate
+    // a new buffer. Assume they are all flipped
+    if (possiblePBBuf != null
+        && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
+      // duplicate the buffer. This is where the header is going to be written
+      ByteBuffer pbBuf = possiblePBBuf.duplicate();
+      // get the current limit
+      int limit = pbBuf.limit();
+      // Position such that we write the header to the end of the buffer
+      pbBuf.position(limit);
+      // limit to the header size
+      pbBuf.limit(totalPBSize + limit);
+      // mark the current position
+      pbBuf.mark();
+      writeToCOS(result, header, totalSize, pbBuf);
+      // reset the buffer back to old position
+      pbBuf.reset();
+      return pbBuf;
+    } else {
+      return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+    }
+  }
+
+  private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
+      throws IOException {
+    ByteBufferUtils.putInt(pbBuf, totalSize);
+    // create COS that works on BB
+    CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
+    if (header != null) {
+      cos.writeMessageNoTag(header);
+    }
+    if (result != null) {
+      cos.writeMessageNoTag(result);
+    }
+    cos.flush();
+    cos.checkNoSpaceLeft();
+  }
+
+  private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+      int totalSize, int totalPBSize) throws IOException {
+    ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
+    writeToCOS(result, header, totalSize, pbBuf);
+    pbBuf.flip();
+    return pbBuf;
+  }
+
+  protected BufferChain wrapWithSasl(BufferChain bc)
+      throws IOException {
+    if (!this.connection.useSasl) return bc;
+    // Looks like no way around this; saslserver wants a byte array.  I have to make it one.
+    // THIS IS A BIG UGLY COPY.
+    byte [] responseBytes = bc.getBytes();
+    byte [] token;
+    // synchronization may be needed since there can be multiple Handler
+    // threads using saslServer or Crypto AES to wrap responses.
+    if (connection.useCryptoAesWrap) {
+      // wrap with Crypto AES
+      synchronized (connection.cryptoAES) {
+        token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
+      }
+    } else {
+      synchronized (connection.saslServer) {
+        token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
+      }
+    }
+    if (RpcServer.LOG.isTraceEnabled()) {
+      RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length
+          + " as call response.");
+    }
+
+    ByteBuffer[] responseBufs = new ByteBuffer[2];
+    responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
+    responseBufs[1] = ByteBuffer.wrap(token);
+    return new BufferChain(responseBufs);
+  }
+
+  @Override
+  public long disconnectSince() {
+    if (!this.connection.isConnectionOpen()) {
+      return System.currentTimeMillis() - receiveTime;
+    } else {
+      return -1L;
+    }
+  }
+
+  @Override
+  public boolean isClientCellBlockSupported() {
+    return this.connection != null && this.connection.codec != null;
+  }
+
+  @Override
+  public long getResponseCellSize() {
+    return responseCellSize;
+  }
+
+  @Override
+  public void incrementResponseCellSize(long cellSize) {
+    responseCellSize += cellSize;
+  }
+
+  @Override
+  public long getResponseBlockSize() {
+    return responseBlockSize;
+  }
+
+  @Override
+  public void incrementResponseBlockSize(long blockSize) {
+    responseBlockSize += blockSize;
+  }
+
+  @Override
+  public long getResponseExceptionSize() {
+    return exceptionSize;
+  }
+  @Override
+  public void incrementResponseExceptionSize(long exSize) {
+    exceptionSize += exSize;
+  }
+
+  @Override
+  public long getSize() {
+    return this.size;
+  }
+
+  @Override
+  public long getDeadline() {
+    return deadline;
+  }
+
+  @Override
+  public User getRequestUser() {
+    return user;
+  }
+
+  @Override
+  public String getRequestUserName() {
+    User user = getRequestUser();
+    return user == null? null: user.getShortName();
+  }
+
+  @Override
+  public InetAddress getRemoteAddress() {
+    return remoteAddress;
+  }
+
+  @Override
+  public VersionInfo getClientVersionInfo() {
+    return connection.getVersionInfo();
+  }
+
+  @Override
+  public synchronized void setCallBack(RpcCallback callback) {
+    this.rpcCallback = callback;
+  }
+
+  @Override
+  public boolean isRetryImmediatelySupported() {
+    return retryImmediatelySupported;
+  }
+
+  @Override
+  public BlockingService getService() {
+    return service;
+  }
+
+  @Override
+  public MethodDescriptor getMethod() {
+    return md;
+  }
+
+  @Override
+  public Message getParam() {
+    return param;
+  }
+
+  @Override
+  public CellScanner getCellScanner() {
+    return cellScanner;
+  }
+
+  @Override
+  public long getReceiveTime() {
+    return receiveTime;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+
+  @Override
+  public void setStartTime(long t) {
+    this.startTime = t;
+  }
+
+  @Override
+  public int getTimeout() {
+    return timeout;
+  }
+
+  @Override
+  public int getRemotePort() {
+    return connection.getRemotePort();
+  }
+
+  @Override
+  public TraceInfo getTraceInfo() {
+    return tinfo;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index f771eec..59d1ff9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -127,61 +127,6 @@ public class SimpleRpcServer extends RpcServer {
   private Listener listener = null;
   protected Responder responder = null;
 
-  /**
-   * Datastructure that holds all necessary to a method invocation and then afterward, carries
-   * the result.
-   */
-  @InterfaceStability.Evolving
-  public class Call extends RpcServer.Call {
-
-    protected Responder responder;
-
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
-        justification="Can't figure why this complaint is happening... see below")
-    Call(int id, final BlockingService service, final MethodDescriptor md,
-        RequestHeader header, Message param, CellScanner cellScanner,
-        RpcServer.Connection connection, long size, TraceInfo tinfo,
-        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup,
-        Responder responder) {
-      super(id, service, md, header, param, cellScanner, connection, size,
-          tinfo, remoteAddress, timeout, reqCleanup);
-      this.responder = responder;
-    }
-
-    /**
-     * Call is done. Execution happened and we returned results to client. It is now safe to
-     * cleanup.
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
-        justification="Presume the lock on processing request held by caller is protection enough")
-    @Override
-    void done() {
-      super.done();
-      this.getConnection().decRpcCount(); // Say that we're done with this call.
-    }
-
-    @Override
-    public long disconnectSince() {
-      if (!getConnection().isConnectionOpen()) {
-        return System.currentTimeMillis() - timestamp;
-      } else {
-        return -1L;
-      }
-    }
-
-    @Override
-    public synchronized void sendResponseIfReady() throws IOException {
-      // set param null to reduce memory pressure
-      this.param = null;
-      this.responder.doRespond(this);
-    }
-
-    Connection getConnection() {
-      return (Connection) this.connection;
-    }
-
-  }
-
   /** Listens on the socket. Creates jobs for the handler threads*/
   private class Listener extends Thread {
 
@@ -589,8 +534,8 @@ public class SimpleRpcServer extends RpcServer {
           if (connection == null) {
             throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
           }
-          Call call = connection.responseQueue.peekFirst();
-          if (call != null && now > call.timestamp + purgeTimeout) {
+          SimpleServerCall call = connection.responseQueue.peekFirst();
+          if (call != null && now > call.lastSentTime + purgeTimeout) {
             conWithOldCalls.add(call.getConnection());
           }
         }
@@ -637,7 +582,7 @@ public class SimpleRpcServer extends RpcServer {
      * @return true if we proceed the call fully, false otherwise.
      * @throws IOException
      */
-    private boolean processResponse(final Call call) throws IOException {
+    private boolean processResponse(final SimpleServerCall call) throws IOException {
       boolean error = true;
       try {
         // Send as much data as we can in the non-blocking fashion
@@ -680,7 +625,7 @@ public class SimpleRpcServer extends RpcServer {
       try {
         for (int i = 0; i < 20; i++) {
           // protection if some handlers manage to need all the responder
-          Call call = connection.responseQueue.pollFirst();
+          SimpleServerCall call = connection.responseQueue.pollFirst();
           if (call == null) {
             return true;
           }
@@ -699,7 +644,7 @@ public class SimpleRpcServer extends RpcServer {
     //
     // Enqueue a response from the application.
     //
-    void doRespond(Call call) throws IOException {
+    void doRespond(SimpleServerCall call) throws IOException {
       boolean added = false;
 
       // If there is already a write in progress, we don't wait. This allows to free the handlers
@@ -728,7 +673,7 @@ public class SimpleRpcServer extends RpcServer {
       call.responder.registerForWrite(call.getConnection());
 
       // set the serve time when the response has to be sent later
-      call.timestamp = System.currentTimeMillis();
+      call.lastSentTime = System.currentTimeMillis();
     }
   }
 
@@ -741,7 +686,7 @@ public class SimpleRpcServer extends RpcServer {
     protected SocketChannel channel;
     private ByteBuff data;
     private ByteBuffer dataLengthBuffer;
-    protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<>();
+    protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue = new ConcurrentLinkedDeque<>();
     private final Lock responseWriteLock = new ReentrantLock();
     private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
     private long lastContact;
@@ -769,13 +714,14 @@ public class SimpleRpcServer extends RpcServer {
                    socketSendBufferSize);
         }
       }
-      this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
-          0, null, null, 0, null, responder);
-      this.setConnectionHeaderResponseCall = new Call(
-          CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
-          this, 0, null, null, 0, null, responder);
-      this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
-          null, null, null, this, 0, null, null, 0, null, responder);
+      this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+          null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
+      this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
+          null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
+          reservoir, cellBlockBuilder, null, responder);
+      this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null,
+          null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir,
+          cellBlockBuilder, null, responder);
     }
 
     public void setLastContact(long lastContact) {
@@ -941,8 +887,9 @@ public class SimpleRpcServer extends RpcServer {
             RequestHeader header = (RequestHeader) builder.build();
 
             // Notify the client about the offending request
-            Call reqTooBig = new Call(header.getCallId(), this.service, null,
-                null, null, null, this, 0, null, this.addr, 0, null, responder);
+            SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service,
+                null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
+                reservoir, cellBlockBuilder, null, responder);
             metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
             // Make sure the client recognizes the underlying exception
             // Otherwise, throw a DoNotRetryIOException.
@@ -1043,8 +990,8 @@ public class SimpleRpcServer extends RpcServer {
 
     private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
       LOG.warn(msg);
-      Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
-          null, null, 0, null, responder);
+      SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
+          null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
       setupResponse(null, fakeCall, e, msg);
       responder.doRespond(fakeCall);
       // Returning -1 closes out the connection.
@@ -1081,13 +1028,13 @@ public class SimpleRpcServer extends RpcServer {
     }
 
     @Override
-    public RpcServer.Call createCall(int id, final BlockingService service,
-        final MethodDescriptor md, RequestHeader header, Message param,
-        CellScanner cellScanner, RpcServer.Connection connection, long size,
-        TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
-        CallCleanup reqCleanup) {
-      return new Call(id, service, md, header, param, cellScanner, connection,
-          size, tinfo, remoteAddress, timeout, reqCleanup, responder);
+    public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md,
+        RequestHeader header, Message param, CellScanner cellScanner,
+        RpcServer.Connection connection, long size, TraceInfo tinfo,
+        final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
+      return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size,
+          tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
+          reqCleanup, responder);
     }
   }
 
@@ -1206,17 +1153,16 @@ public class SimpleRpcServer extends RpcServer {
   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
       throws IOException {
-    return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
+    return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),
+      0);
   }
 
   @Override
-  public Pair<Message, CellScanner> call(BlockingService service,
-      MethodDescriptor md, Message param, CellScanner cellScanner,
-      long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
-      throws IOException {
-    Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
-        -1, null, null, timeout, null, null);
-    fakeCall.setReceiveTime(receiveTime);
+  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+      long startTime, int timeout) throws IOException {
+    SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
+        null, -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
     return call(fakeCall, status);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
new file mode 100644
index 0000000..b82d348
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+@InterfaceAudience.Private
+class SimpleServerCall extends ServerCall {
+
+  long lastSentTime;
+
+  final Responder responder;
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+      justification = "Can't figure why this complaint is happening... see below")
+  SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
+      RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection,
+      long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout,
+      ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
+      Responder responder) {
+    super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
+        receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+    this.responder = responder;
+  }
+
+  /**
+   * Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "Presume the lock on processing request held by caller is protection enough")
+  @Override
+  void done() {
+    super.done();
+    this.getConnection().decRpcCount(); // Say that we're done with this call.
+  }
+
+  @Override
+  public synchronized void sendResponseIfReady() throws IOException {
+    // set param null to reduce memory pressure
+    this.param = null;
+    this.responder.doRespond(this);
+  }
+
+  Connection getConnection() {
+    return (Connection) this.connection;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index 47c15ae..a4cc233 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -33,8 +33,7 @@ public class TestCallRunner {
   public void testSimpleCall() {
     RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
     Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
-    RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
-    mockCall.connection = Mockito.mock(RpcServer.Connection.class);
+    ServerCall mockCall = Mockito.mock(ServerCall.class);
     CallRunner cr = new CallRunner(mockRpcServer, mockCall);
     cr.setStatus(new MonitoredRPCHandlerImpl());
     cr.run();

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 66b77cd..1d7c12e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -30,6 +30,11 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -44,18 +49,16 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -63,18 +66,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
 @Category({RPCTests.class, SmallTests.class})
 public class TestSimpleRpcScheduler {/*
   @Rule
@@ -167,7 +163,7 @@ public class TestSimpleRpcScheduler {/*
   }
 
   private CallRunner createMockTask() {
-    Call call = mock(Call.class);
+    ServerCall call = mock(ServerCall.class);
     CallRunner task = mock(CallRunner.class);
     when(task.getRpcCall()).thenReturn(call);
     return task;
@@ -195,19 +191,19 @@ public class TestSimpleRpcScheduler {/*
       scheduler.start();
 
       CallRunner smallCallTask = mock(CallRunner.class);
-      RpcServer.Call smallCall = mock(RpcServer.Call.class);
+      ServerCall smallCall = mock(ServerCall.class);
       RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
       when(smallCallTask.getRpcCall()).thenReturn(smallCall);
       when(smallCall.getHeader()).thenReturn(smallHead);
 
       CallRunner largeCallTask = mock(CallRunner.class);
-      RpcServer.Call largeCall = mock(RpcServer.Call.class);
+      ServerCall largeCall = mock(ServerCall.class);
       RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
       when(largeCallTask.getRpcCall()).thenReturn(largeCall);
       when(largeCall.getHeader()).thenReturn(largeHead);
 
       CallRunner hugeCallTask = mock(CallRunner.class);
-      RpcServer.Call hugeCall = mock(RpcServer.Call.class);
+      ServerCall hugeCall = mock(ServerCall.class);
       RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
       when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
       when(hugeCall.getHeader()).thenReturn(hugeHead);
@@ -290,7 +286,7 @@ public class TestSimpleRpcScheduler {/*
       scheduler.start();
 
       CallRunner putCallTask = mock(CallRunner.class);
-      RpcServer.Call putCall = mock(RpcServer.Call.class);
+      ServerCall putCall = mock(ServerCall.class);
       putCall.param = RequestConverter.buildMutateRequest(
           Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@@ -299,13 +295,13 @@ public class TestSimpleRpcScheduler {/*
       when(putCall.getParam()).thenReturn(putCall.param);
 
       CallRunner getCallTask = mock(CallRunner.class);
-      RpcServer.Call getCall = mock(RpcServer.Call.class);
+      ServerCall getCall = mock(ServerCall.class);
       RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
       when(getCallTask.getRpcCall()).thenReturn(getCall);
       when(getCall.getHeader()).thenReturn(getHead);
 
       CallRunner scanCallTask = mock(CallRunner.class);
-      RpcServer.Call scanCall = mock(RpcServer.Call.class);
+      ServerCall scanCall = mock(ServerCall.class);
       scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
       RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
       when(scanCallTask.getRpcCall()).thenReturn(scanCall);
@@ -382,7 +378,7 @@ public class TestSimpleRpcScheduler {/*
       scheduler.start();
 
       CallRunner putCallTask = mock(CallRunner.class);
-      RpcServer.Call putCall = mock(RpcServer.Call.class);
+      ServerCall putCall = mock(ServerCall.class);
       putCall.param = RequestConverter.buildMutateRequest(
         Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@@ -506,19 +502,15 @@ public class TestSimpleRpcScheduler {/*
   // Get mocked call that has the CallRunner sleep for a while so that the fast
   // path isn't hit.
   private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
-    final RpcServer.Call putCall = mock(RpcServer.Call.class);
-
-    putCall.timestamp = timestamp;
-    putCall.param = RequestConverter.buildMutateRequest(
-        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+    ServerCall putCall = new ServerCall(1, null, null,
+        RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
+        RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
+        null, null, 9, null, null, timestamp, 0, null, null, null) {
 
-    RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
-                                                             .setMethodName("mutate")
-                                                             .build();
-    when(putCall.getSize()).thenReturn(9L);
-    when(putCall.getHeader()).thenReturn(putHead);
-    when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
-    when(putCall.getParam()).thenReturn(putCall.param);
+      @Override
+      public void sendResponseIfReady() throws IOException {
+      }
+    };
 
     CallRunner cr = new CallRunner(null, putCall) {
       public void run() {
@@ -530,11 +522,13 @@ public class TestSimpleRpcScheduler {/*
         } catch (InterruptedException e) {
         }
       }
+
       public RpcCall getRpcCall() {
         return putCall;
       }
 
-      public void drop() {}
+      public void drop() {
+      }
     };
 
     return cr;

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
index 85a14f2..a31b7b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -28,6 +28,8 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -57,8 +59,8 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -75,10 +77,8 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
-
 @RunWith(Parameterized.class)
-@Category({ SecurityTests.class, SmallTests.class })
+@Category({ SecurityTests.class, MediumTests.class })
 public class TestSecureIPC {
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();