You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/12/05 18:26:36 UTC

hbase git commit: HBASE-17221 Abstract out an interface for RpcServer.Call

Repository: hbase
Updated Branches:
  refs/heads/master 1c8822ddf -> 39653862a


HBASE-17221 Abstract out an interface for RpcServer.Call


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39653862
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39653862
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39653862

Branch: refs/heads/master
Commit: 39653862a4d24b5309e972ca38554a7f81bc94fd
Parents: 1c8822d
Author: Jerry He <je...@apache.org>
Authored: Mon Dec 5 10:21:55 2016 -0800
Committer: Jerry He <je...@apache.org>
Committed: Mon Dec 5 10:21:55 2016 -0800

----------------------------------------------------------------------
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   |   2 +-
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |  46 +++---
 .../hadoop/hbase/ipc/RWQueueRpcExecutor.java    |   7 +-
 .../org/apache/hadoop/hbase/ipc/RpcCall.java    | 146 +++++++++++++++++++
 .../apache/hadoop/hbase/ipc/RpcCallContext.java |  14 +-
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    |  12 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 138 +++++++++++++-----
 .../hadoop/hbase/ipc/RpcServerInterface.java    |  11 +-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |   4 +-
 .../hadoop/hbase/TestMetaTableAccessor.java     |   2 +-
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  27 ++--
 11 files changed, 323 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 42b500f..82b8f1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -161,7 +161,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
    */
   private boolean needToDrop(CallRunner callRunner) {
     long now = EnvironmentEdgeManager.currentTime();
-    long callDelay = now - callRunner.getCall().timestamp;
+    long callDelay = now - callRunner.getRpcCall().getReceiveTime();
 
     long localMinDelay = this.minDelay;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index d570b17..5301a67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -25,16 +25,14 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-
 /**
  * The request processing logic, which is usually executed in thread pools provided by an
  * {@link RpcScheduler}.  Call {@link #run()} to actually execute the contained
@@ -47,7 +45,7 @@ public class CallRunner {
   private static final CallDroppedException CALL_DROPPED_EXCEPTION
     = new CallDroppedException();
 
-  private Call call;
+  private RpcCall call;
   private RpcServerInterface rpcServer;
   private MonitoredRPCHandler status;
   private volatile boolean sucessful;
@@ -58,7 +56,7 @@ public class CallRunner {
    * time we occupy heap.
    */
   // The constructor is shutdown so only RpcServer in this class can make one of these.
-  CallRunner(final RpcServerInterface rpcServer, final Call call) {
+  CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
     this.call = call;
     this.rpcServer = rpcServer;
     // Add size of the call to queue size.
@@ -67,10 +65,19 @@ public class CallRunner {
     }
   }
 
-  public Call getCall() {
+  public RpcCall getRpcCall() {
     return call;
   }
 
+  /**
+   * Keep for backward compatibility.
+   * @deprecated As of release 2.0, this will be removed in HBase 3.0
+   */
+  @Deprecated
+  public RpcServer.Call getCall() {
+    return (RpcServer.Call) call;
+  }
+
   public void setStatus(MonitoredRPCHandler status) {
     this.status = status;
   }
@@ -85,23 +92,23 @@ public class CallRunner {
 
   public void run() {
     try {
-      if (!call.connection.channel.isOpen()) {
+      if (call.disconnectSince() >= 0) {
         if (RpcServer.LOG.isDebugEnabled()) {
           RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
         }
         return;
       }
-      call.startTime = System.currentTimeMillis();
-      if (call.startTime > call.deadline) {
+      call.setStartTime(System.currentTimeMillis());
+      if (call.getStartTime() > call.getDeadline()) {
         RpcServer.LOG.warn("Dropping timed out call: " + call);
         return;
       }
       this.status.setStatus("Setting up call");
-      this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
+      this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
       if (RpcServer.LOG.isTraceEnabled()) {
-        UserGroupInformation remoteUser = call.connection.ugi;
+        User remoteUser = call.getRequestUser();
         RpcServer.LOG.trace(call.toShortString() + " executing as " +
-            ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
+            ((remoteUser == null) ? "NULL principal" : remoteUser.getName()));
       }
       Throwable errorThrowable = null;
       String error = null;
@@ -114,12 +121,15 @@ public class CallRunner {
           throw new ServerNotRunningYetException("Server " +
               (address != null ? address : "(channel closed)") + " is not running yet");
         }
-        if (call.tinfo != null) {
-          traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
+        if (call.getTraceInfo() != null) {
+          String serviceName =
+              call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
+          String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
+          String traceString = serviceName + "." + methodName;
+          traceScope = Trace.startSpan(traceString, call.getTraceInfo());
         }
         // make the call
-        resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
-            call.timestamp, this.status, call.startTime, call.timeout);
+        resultPair = this.rpcServer.call(call, this.status);
       } catch (TimeoutIOException e){
         RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
         return;
@@ -181,7 +191,7 @@ public class CallRunner {
    */
   public void drop() {
     try {
-      if (!call.connection.channel.isOpen()) {
+      if (call.disconnectSince() >= 0) {
         if (RpcServer.LOG.isDebugEnabled()) {
           RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 6005900..8637f79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@@ -129,11 +130,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
 
   @Override
   public boolean dispatch(final CallRunner callTask) throws InterruptedException {
-    RpcServer.Call call = callTask.getCall();
+    RpcCall call = callTask.getRpcCall();
     int queueIndex;
-    if (isWriteRequest(call.getHeader(), call.param)) {
+    if (isWriteRequest(call.getHeader(), call.getParam())) {
       queueIndex = writeBalancer.getNextQueue();
-    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+    } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
       queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
     } else {
       queueIndex = numWriteQueues + readBalancer.getNextQueue();

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
new file mode 100644
index 0000000..239ea9e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Interface of all necessary to carry out a RPC method invocation on the server.
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public interface RpcCall extends RpcCallContext {
+
+  /**
+   * @return The service of this call.
+   */
+  BlockingService getService();
+
+  /**
+   * @return The service method.
+   */
+  MethodDescriptor getMethod();
+
+  /**
+   * @return The call parameter message.
+   */
+  Message getParam();
+
+  /**
+   * @return The CellScanner that can carry input and result payload.
+   */
+  CellScanner getCellScanner();
+
+  /**
+   * @return The timestamp when the call is constructed.
+   */
+  long getReceiveTime();
+
+  /**
+   * Set the timestamp when the call is constructed.
+   */
+  void setReceiveTime(long receiveTime);
+
+  /**
+   * @return The time when the call starts to be executed.
+   */
+  long getStartTime();
+
+  /**
+   * Set the time when the call starts to be executed.
+   */
+  void setStartTime(long startTime);
+
+  /**
+   * @return The timeout of this call.
+   */
+  int getTimeout();
+
+  /**
+   * @return The Priority of this call.
+   */
+  int getPriority();
+
+  /**
+   * Return the deadline of this call. If we can not complete this call in time,
+   * we can throw a TimeoutIOException and RPCServer will drop it.
+   * @return The system timestamp of deadline.
+   */
+  long getDeadline();
+
+  /**
+   * Used to calculate the request call queue size.
+   * If the total request call size exceeds a limit, the call will be rejected.
+   * @return The raw size of this call.
+   */
+  long getSize();
+
+  /**
+   * @return The request header of this call.
+   */
+  RequestHeader getHeader();
+
+  /**
+   * @return Port of remote address in this call
+   */
+  int getRemotePort();
+
+  /**
+   * Set the response resulting from this RPC call.
+   * @param param The result message as response.
+   * @param cells The CellScanner that possibly carries the payload.
+   * @param errorThrowable The error Throwable resulting from the call.
+   * @param error Extra error message.
+   */
+  void setResponse(Message param, CellScanner cells, Throwable errorThrowable, String error);
+
+  /**
+   * Send the response of this RPC call.
+   * Implementation provides the underlying facility (connection, etc) to send.
+   * @throws IOException
+   */
+  void sendResponseIfReady() throws IOException;
+
+  /**
+   * Do the necessary cleanup after the call if needed.
+   */
+  void cleanup();
+
+  /**
+   * @return A short string format of this call without possibly lengthy params
+   */
+  String toShortString();
+
+  /**
+   * @return TraceInfo attached to this call.
+   */
+  TraceInfo getTraceInfo();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 9bc8ee7..d2fd557 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
 import org.apache.hadoop.hbase.security.User;
 
+/**
+ * Interface of all necessary to carry out a RPC service invocation on the server. This interface
+ * focus on the information needed or obtained during the actual execution of the service method.
+ */
 @InterfaceAudience.Private
 public interface RpcCallContext {
   /**
@@ -56,7 +60,7 @@ public interface RpcCallContext {
   String getRequestUserName();
 
   /**
-   * @return Address of remote client if a request is ongoing, else null
+   * @return Address of remote client in this call
    */
   InetAddress getRemoteAddress();
 
@@ -92,12 +96,6 @@ public interface RpcCallContext {
   void incrementResponseCellSize(long cellSize);
 
   long getResponseBlockSize();
-  void incrementResponseBlockSize(long blockSize);
 
-  /**
-   * Return the deadline of this call. If we can not complete this call in time, we can throw a
-   * TimeoutIOException and RPCServer will drop it.
-   * @return The system timestamp of deadline.
-   */
-  long getDeadline();
+  void incrementResponseBlockSize(long blockSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index e41f4c7..3cb6011 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -361,12 +361,12 @@ public abstract class RpcExecutor {
 
     @Override
     public int compare(CallRunner a, CallRunner b) {
-      RpcServer.Call callA = a.getCall();
-      RpcServer.Call callB = b.getCall();
-      long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
-      long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
-      deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
-      deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
+      RpcCall callA = a.getRpcCall();
+      RpcCall callB = b.getRpcCall();
+      long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam());
+      long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam());
+      deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay);
+      deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay);
       return Long.compare(deadlineA, deadlineB);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ee44c68..8b6379b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -214,7 +214,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
   /** This is set to Call object before Handler invokes an RPC and ybdie
    * after the call returns.
    */
-  protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+  protected static final ThreadLocal<RpcCall> CurCall =
+      new ThreadLocal<RpcCall>();
 
   /** Keeps MonitoredRPCHandler per handler thread. */
   static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
@@ -326,9 +327,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
    * Datastructure that holds all necessary to a method invocation and then afterward, carries
    * the result.
    */
-  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
-  @InterfaceStability.Evolving
-  public class Call implements RpcCallContext {
+  @InterfaceAudience.Private
+  public class Call implements RpcCall {
     protected int id;                             // the client's call id
     protected BlockingService service;
     protected MethodDescriptor md;
@@ -408,7 +408,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.connection.decRpcCount();  // Say that we're done with this call.
     }
 
-    protected void cleanup() {
+    @Override
+    public void cleanup() {
       if (this.reqCleanup != null) {
         this.reqCleanup.run();
         this.reqCleanup = null;
@@ -422,7 +423,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         " connection: " + connection.toString();
     }
 
-    protected RequestHeader getHeader() {
+    @Override
+    public RequestHeader getHeader() {
       return this.header;
     }
 
@@ -430,6 +432,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return this.header.hasPriority();
     }
 
+    @Override
     public int getPriority() {
       return this.header.getPriority();
     }
@@ -438,7 +441,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      * Short string representation without param info because param itself could be huge depends on
      * the payload of a command
      */
-    String toShortString() {
+    @Override
+    public String toShortString() {
       String serviceName = this.connection.service != null ?
           this.connection.service.getDescriptorForType().getName() : "null";
       return "callId: " + this.id + " service: " + serviceName +
@@ -448,13 +452,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
           " deadline: " + deadline;
     }
 
-    String toTraceString() {
-      String serviceName = this.connection.service != null ?
-                           this.connection.service.getDescriptorForType().getName() : "";
-      String methodName = (this.md != null) ? this.md.getName() : "";
-      return serviceName + "." + methodName;
-    }
-
     protected synchronized void setSaslTokenResponse(ByteBuffer response) {
       ByteBuffer[] responseBufs = new ByteBuffer[1];
       responseBufs[0] = response;
@@ -467,15 +464,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.response = new BufferChain(responseBufs);
     }
 
-    protected synchronized void setResponse(Object m, final CellScanner cells,
+    @Override
+    public synchronized void setResponse(Message m, final CellScanner cells,
         Throwable t, String errorMsg) {
       if (this.isError) return;
       if (t != null) this.isError = true;
       BufferChain bc = null;
       try {
         ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
-        // Presume it a pb Message.  Could be null.
-        Message result = (Message)m;
         // Call id.
         headerBuilder.setCallId(this.id);
         if (t != null) {
@@ -511,7 +507,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
         Message header = headerBuilder.build();
         ByteBuffer headerBuf =
-            createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
+            createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
         ByteBuffer[] responseBufs = null;
         int cellBlockBufferSize = 0;
         if (cellBlock != null) {
@@ -681,10 +677,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       }
     }
 
-    public long getSize() {
-      return this.size;
-    }
-
     @Override
     public long getResponseCellSize() {
       return responseCellSize;
@@ -706,20 +698,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     }
 
     @Override
+    public long getSize() {
+      return this.size;
+    }
+
+    @Override
     public long getDeadline() {
       return deadline;
     }
 
+    @Override
     public synchronized void sendResponseIfReady() throws IOException {
       // set param null to reduce memory pressure
       this.param = null;
       this.responder.doRespond(this);
     }
 
-    public UserGroupInformation getRemoteUser() {
-      return connection.ugi;
-    }
-
     @Override
     public User getRequestUser() {
       return user;
@@ -750,6 +744,64 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     public boolean isRetryImmediatelySupported() {
       return retryImmediatelySupported;
     }
+
+    @Override
+    public BlockingService getService() {
+      return service;
+    }
+
+    @Override
+    public MethodDescriptor getMethod() {
+      return md;
+    }
+
+    @Override
+    public Message getParam() {
+      return param;
+    }
+
+    @Override
+    public CellScanner getCellScanner() {
+      return cellScanner;
+    }
+
+    @Override
+    public long getReceiveTime() {
+      return timestamp;
+    }
+
+    @Override
+    public void setReceiveTime(long t) {
+      this.timestamp = t;
+
+    }
+
+    @Override
+    public long getStartTime() {
+      return startTime;
+    }
+
+    @Override
+    public void setStartTime(long t) {
+      this.startTime = t;
+
+    }
+
+    @Override
+    public int getTimeout() {
+      return timeout;
+    }
+
+    @Override
+    public int getRemotePort() {
+      return connection.getRemotePort();
+    }
+
+    @Override
+    public TraceInfo getTraceInfo() {
+      return tinfo;
+    }
+
   }
 
   @FunctionalInterface
@@ -2565,24 +2617,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
   }
 
+  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
+      CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
+      int timeout)
+      throws IOException {
+    Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout,
+      null);
+    fakeCall.setReceiveTime(receiveTime);
+    return call(fakeCall, status);
+  }
+
   /**
    * This is a server side method, which is invoked over RPC. On success
    * the return response has protobuf response payload. On failure, the
    * exception name and the stack trace are returned in the protobuf response.
    */
-  public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
-      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
-      long startTime, int timeout)
+  public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
   throws IOException {
     try {
-      status.setRPC(md.getName(), new Object[]{param}, receiveTime);
+      MethodDescriptor md = call.getMethod();
+      Message param = call.getParam();
+      status.setRPC(md.getName(), new Object[]{param},
+        call.getReceiveTime());
       // TODO: Review after we add in encoded data blocks.
       status.setRPCPacket(param);
       status.resume("Servicing call");
       //get an instance of the method arg type
-      HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
-      controller.setCallTimeout(timeout);
-      Message result = service.callBlockingMethod(md, controller, param);
+      HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
+      controller.setCallTimeout(call.getTimeout());
+      Message result = call.getService().callBlockingMethod(md, controller, param);
+      long receiveTime = call.getReceiveTime();
+      long startTime = call.getStartTime();
       long endTime = System.currentTimeMillis();
       int processingTime = (int) (endTime - startTime);
       int qTime = (int) (startTime - receiveTime);
@@ -2596,6 +2661,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       }
       long requestSize = param.getSerializedSize();
       long responseSize = result.getSerializedSize();
+
       metrics.dequeuedCall(qTime);
       metrics.processedCall(processingTime);
       metrics.totalCall(totalTime);
@@ -3014,9 +3080,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
    *  @return InetAddress
    */
   public static InetAddress getRemoteIp() {
-    Call call = CurCall.get();
-    if (call != null && call.connection != null && call.connection.socket != null) {
-      return call.connection.socket.getInetAddress();
+    RpcCall call = CurCall.get();
+    if (call != null) {
+      return call.getRemoteAddress();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 5401e3f..a847931 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -55,11 +55,18 @@ public interface RpcServerInterface {
   @Deprecated
   Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
     Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
-  throws IOException, ServiceException;
+  throws IOException;
 
+  /**
+   * @deprecated As of release 2.0, this will be removed in HBase 3.0
+   */
+  @Deprecated
   Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
       CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
-      int timeout) throws IOException, ServiceException;
+      int timeout) throws IOException;
+
+  Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
+      throws IOException;
 
   void setErrorHandler(HBaseRPCErrorHandler handler);
   HBaseRPCErrorHandler getErrorHandler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 3aa486e..1f7e8ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -157,8 +157,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
 
   @Override
   public boolean dispatch(CallRunner callTask) throws InterruptedException {
-    RpcServer.Call call = callTask.getCall();
-    int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
+    RpcCall call = callTask.getRpcCall();
+    int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
     if (priorityExecutor != null && level > highPriorityLevel) {
       return priorityExecutor.dispatch(callTask);
     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index d750faf..e1c19e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -643,7 +643,7 @@ public class TestMetaTableAccessor {
 
     @Override
     public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
-      int priority = task.getCall().getPriority();
+      int priority = task.getRpcCall().getPriority();
 
       if (priority > HConstants.QOS_THRESHOLD) {
         numPriorityCalls++;

http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index c9c5684..3535d23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -99,6 +99,7 @@ public class TestSimpleRpcScheduler {/*
 
   @Test
   public void testBasic() throws IOException, InterruptedException {
+
     PriorityFunction qosFunction = mock(PriorityFunction.class);
     RpcScheduler scheduler = new SimpleRpcScheduler(
         conf, 10, 0, 0, qosFunction, 0);
@@ -113,6 +114,7 @@ public class TestSimpleRpcScheduler {/*
 
   @Test
   public void testHandlerIsolation() throws IOException, InterruptedException {
+
     CallRunner generalTask = createMockTask();
     CallRunner priorityTask = createMockTask();
     CallRunner replicationTask = createMockTask();
@@ -167,12 +169,13 @@ public class TestSimpleRpcScheduler {/*
   private CallRunner createMockTask() {
     Call call = mock(Call.class);
     CallRunner task = mock(CallRunner.class);
-    when(task.getCall()).thenReturn(call);
+    when(task.getRpcCall()).thenReturn(call);
     return task;
   }
 
   @Test
   public void testRpcScheduler() throws Exception {
+
     testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
     testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
   }
@@ -194,19 +197,19 @@ public class TestSimpleRpcScheduler {/*
       CallRunner smallCallTask = mock(CallRunner.class);
       RpcServer.Call smallCall = mock(RpcServer.Call.class);
       RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
-      when(smallCallTask.getCall()).thenReturn(smallCall);
+      when(smallCallTask.getRpcCall()).thenReturn(smallCall);
       when(smallCall.getHeader()).thenReturn(smallHead);
 
       CallRunner largeCallTask = mock(CallRunner.class);
       RpcServer.Call largeCall = mock(RpcServer.Call.class);
       RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
-      when(largeCallTask.getCall()).thenReturn(largeCall);
+      when(largeCallTask.getRpcCall()).thenReturn(largeCall);
       when(largeCall.getHeader()).thenReturn(largeHead);
 
       CallRunner hugeCallTask = mock(CallRunner.class);
       RpcServer.Call hugeCall = mock(RpcServer.Call.class);
       RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
-      when(hugeCallTask.getCall()).thenReturn(hugeCall);
+      when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
       when(hugeCall.getHeader()).thenReturn(hugeHead);
 
       when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
@@ -255,6 +258,7 @@ public class TestSimpleRpcScheduler {/*
 
   @Test
   public void testScanQueueWithZeroScanRatio() throws Exception {
+
     Configuration schedConf = HBaseConfiguration.create();
     schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
     schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
@@ -290,21 +294,23 @@ public class TestSimpleRpcScheduler {/*
       putCall.param = RequestConverter.buildMutateRequest(
           Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
-      when(putCallTask.getCall()).thenReturn(putCall);
+      when(putCallTask.getRpcCall()).thenReturn(putCall);
       when(putCall.getHeader()).thenReturn(putHead);
+      when(putCall.getParam()).thenReturn(putCall.param);
 
       CallRunner getCallTask = mock(CallRunner.class);
       RpcServer.Call getCall = mock(RpcServer.Call.class);
       RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
-      when(getCallTask.getCall()).thenReturn(getCall);
+      when(getCallTask.getRpcCall()).thenReturn(getCall);
       when(getCall.getHeader()).thenReturn(getHead);
 
       CallRunner scanCallTask = mock(CallRunner.class);
       RpcServer.Call scanCall = mock(RpcServer.Call.class);
       scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
       RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
-      when(scanCallTask.getCall()).thenReturn(scanCall);
+      when(scanCallTask.getRpcCall()).thenReturn(scanCall);
       when(scanCall.getHeader()).thenReturn(scanHead);
+      when(scanCall.getParam()).thenReturn(scanCall.param);
 
       ArrayList<Integer> work = new ArrayList<Integer>();
       doAnswerTaskExecution(putCallTask, work, 1, 1000);
@@ -361,6 +367,7 @@ public class TestSimpleRpcScheduler {/*
 
   @Test
   public void testSoftAndHardQueueLimits() throws Exception {
+
     Configuration schedConf = HBaseConfiguration.create();
 
     schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
@@ -379,7 +386,7 @@ public class TestSimpleRpcScheduler {/*
       putCall.param = RequestConverter.buildMutateRequest(
         Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
       RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
-      when(putCallTask.getCall()).thenReturn(putCall);
+      when(putCallTask.getRpcCall()).thenReturn(putCall);
       when(putCall.getHeader()).thenReturn(putHead);
 
       assertTrue(scheduler.dispatch(putCallTask));
@@ -512,6 +519,8 @@ public class TestSimpleRpcScheduler {/*
                                                              .build();
     when(putCall.getSize()).thenReturn(9L);
     when(putCall.getHeader()).thenReturn(putHead);
+    when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
+    when(putCall.getParam()).thenReturn(putCall.param);
 
     CallRunner cr = new CallRunner(null, putCall) {
       public void run() {
@@ -523,7 +532,7 @@ public class TestSimpleRpcScheduler {/*
         } catch (InterruptedException e) {
         }
       }
-      public Call getCall() {
+      public RpcCall getRpcCall() {
         return putCall;
       }