You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/03/16 15:01:01 UTC

[08/10] hbase git commit: HBASE-16584 Backport the new ipc implementation in HBASE-16432 to branch-1

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
deleted file mode 100644
index e0c7586..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.ipc.RemoteException;
-
-import com.google.protobuf.Message;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-
-/**
- * Handles Hbase responses
- */
-@InterfaceAudience.Private
-public class AsyncServerResponseHandler extends SimpleChannelInboundHandler<ByteBuf> {
-  private final AsyncRpcChannel channel;
-
-  /**
-   * Constructor
-   * @param channel on which this response handler operates
-   */
-  public AsyncServerResponseHandler(AsyncRpcChannel channel) {
-    this.channel = channel;
-  }
-
-  @Override
-  protected void channelRead0(ChannelHandlerContext ctx, ByteBuf inBuffer) throws Exception {
-    ByteBufInputStream in = new ByteBufInputStream(inBuffer);
-    int totalSize = inBuffer.readableBytes();
-    // Read the header
-    RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
-    int id = responseHeader.getCallId();
-    AsyncCall call = channel.removePendingCall(id);
-    if (call == null) {
-      // So we got a response for which we have no corresponding 'call' here on the client-side.
-      // We probably timed out waiting, cleaned up all references, and now the server decides
-      // to return a response. There is nothing we can do w/ the response at this stage. Clean
-      // out the wire of the response so its out of the way and we can get other responses on
-      // this connection.
-      int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
-      int whatIsLeftToRead = totalSize - readSoFar;
-
-      // This is done through a Netty ByteBuf which has different behavior than InputStream.
-      // It does not return number of bytes read but will update pointer internally and throws an
-      // exception when too many bytes are to be skipped.
-      inBuffer.skipBytes(whatIsLeftToRead);
-      return;
-    }
-
-    if (responseHeader.hasException()) {
-      RPCProtos.ExceptionResponse exceptionResponse = responseHeader.getException();
-      RemoteException re = createRemoteException(exceptionResponse);
-      if (exceptionResponse.getExceptionClassName()
-          .equals(FatalConnectionException.class.getName())) {
-        channel.close(re);
-      } else {
-        call.setFailed(re);
-      }
-    } else {
-      Message value = null;
-      // Call may be null because it may have timedout and been cleaned up on this side already
-      if (call.responseDefaultType != null) {
-        Message.Builder builder = call.responseDefaultType.newBuilderForType();
-        ProtobufUtil.mergeDelimitedFrom(builder, in);
-        value = builder.build();
-      }
-      CellScanner cellBlockScanner = null;
-      if (responseHeader.hasCellBlockMeta()) {
-        int size = responseHeader.getCellBlockMeta().getLength();
-        byte[] cellBlock = new byte[size];
-        inBuffer.readBytes(cellBlock, 0, cellBlock.length);
-        cellBlockScanner = channel.client.createCellScanner(cellBlock);
-      }
-      call.setSuccess(value, cellBlockScanner);
-      call.callStats.setResponseSizeBytes(totalSize);
-    }
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-    channel.close(cause);
-  }
-
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    channel.close(new IOException("connection closed"));
-  }
-
-  /**
-   * @param e Proto exception
-   * @return RemoteException made from passed <code>e</code>
-   */
-  private RemoteException createRemoteException(final RPCProtos.ExceptionResponse e) {
-    String innerExceptionClassName = e.getExceptionClassName();
-    boolean doNotRetry = e.getDoNotRetry();
-    return e.hasHostname() ?
-        // If a hostname then add it to the RemoteWithExtrasException
-        new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
-            e.getPort(), doNotRetry)
-        : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
new file mode 100644
index 0000000..d27602e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.net.NetUtils;
+
+/**
+ * Does RPC against a cluster. Manages connections per regionserver in the cluster.
+ * <p>
+ * See HBaseServer
+ */
+@InterfaceAudience.Private
+public class BlockingRpcClient extends AbstractRpcClient<BlockingRpcConnection> {
+
+  protected final SocketFactory socketFactory; // how to create sockets
+
+  /**
+   * Used in test only. Construct an IPC client for the cluster {@code clusterId} with the default
+   * SocketFactory
+   */
+  @VisibleForTesting
+  BlockingRpcClient(Configuration conf) {
+    this(conf, HConstants.CLUSTER_ID_DEFAULT, null, null);
+  }
+
+  /**
+   * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory This
+   * method is called with reflection by the RpcClientFactory to create an instance
+   * @param conf configuration
+   * @param clusterId the cluster id
+   * @param localAddr client socket bind address.
+   * @param metrics the connection metrics
+   */
+  public BlockingRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+      MetricsConnection metrics) {
+    super(conf, clusterId, localAddr, metrics);
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+  }
+
+  /**
+   * Creates a connection. Can be overridden by a subclass for testing.
+   * @param remoteId - the ConnectionId to use for the connection creation.
+   */
+  protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws IOException {
+    return new BlockingRpcConnection(this, remoteId);
+  }
+
+  @Override
+  protected void closeInternal() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
new file mode 100644
index 0000000..4dc121c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -0,0 +1,730 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.createRemoteException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcCallback;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
+import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+/**
+ * Thread that reads responses and notifies callers. Each connection owns a socket connected to a
+ * remote address. Calls are multiplexed through this socket: responses may be delivered out of
+ * order.
+ */
+@InterfaceAudience.Private
+class BlockingRpcConnection extends RpcConnection implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(BlockingRpcConnection.class);
+
+  private final BlockingRpcClient rpcClient;
+
+  private final String threadName;
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "We are always under lock actually")
+  private Thread thread;
+
+  // connected socket. protected for writing UT.
+  protected Socket socket = null;
+  private DataInputStream in;
+  private DataOutputStream out;
+
+  private HBaseSaslRpcClient saslRpcClient;
+
+  // currently active calls
+  private final ConcurrentMap<Integer, Call> calls = new ConcurrentHashMap<>();
+
+  private final CallSender callSender;
+
+  private boolean closed = false;
+
+  private byte[] connectionHeaderPreamble;
+
+  private byte[] connectionHeaderWithLength;
+
+  /**
+   * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt), it gets into a
+   * java issue: an interruption during a write closes the socket/channel. A way to avoid this is to
+   * use a different thread for writing. This way, on interruptions, we either cancel the writes or
+   * ignore the answer if the write is already done, but we don't stop the write in the middle. This
+   * adds a thread per region server in the client, so it's kept as an option.
+   * <p>
+   * The implementation is simple: the client threads adds their call to the queue, and then wait
+   * for an answer. The CallSender blocks on the queue, and writes the calls one after the other. On
+   * interruption, the client cancels its call. The CallSender checks that the call has not been
+   * canceled before writing it.
+   * </p>
+   * When the connection closes, all the calls not yet sent are dismissed. The client thread is
+   * notified with an appropriate exception, as if the call was already sent but the answer not yet
+   * received.
+   * </p>
+   */
+  private class CallSender extends Thread {
+
+    private final Queue<Call> callsToWrite;
+
+    private final int maxQueueSize;
+
+    public CallSender(String name, Configuration conf) {
+      int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+      callsToWrite = new ArrayDeque<>(queueSize);
+      this.maxQueueSize = queueSize;
+      setDaemon(true);
+      setName(name + " - writer");
+    }
+
+    public void sendCall(final Call call) throws IOException {
+      if (callsToWrite.size() >= maxQueueSize) {
+        throw new IOException("Can't add the call " + call.id +
+            " to the write queue. callsToWrite.size()=" + callsToWrite.size());
+      }
+      callsToWrite.offer(call);
+      BlockingRpcConnection.this.notifyAll();
+    }
+
+    public void remove(Call call) {
+      callsToWrite.remove();
+      // By removing the call from the expected call list, we make the list smaller, but
+      // it means as well that we don't know how many calls we cancelled.
+      calls.remove(call.id);
+      call.setException(new CallCancelledException("Call id=" + call.id + ", waitTime=" +
+          (EnvironmentEdgeManager.currentTime() - call.getStartTime()) + ", rpcTimeout=" +
+          call.timeout));
+    }
+
+    /**
+     * Reads the call from the queue, write them on the socket.
+     */
+    @Override
+    public void run() {
+      synchronized (BlockingRpcConnection.this) {
+        while (!closed) {
+          if (callsToWrite.isEmpty()) {
+            // We should use another monitor object here for better performance since the read
+            // thread also uses ConnectionImpl.this. But this makes the locking schema more
+            // complicated, can do it later as an optimization.
+            try {
+              BlockingRpcConnection.this.wait();
+            } catch (InterruptedException e) {
+            }
+            // check if we need to quit, so continue the main loop instead of fallback.
+            continue;
+          }
+          Call call = callsToWrite.poll();
+          if (call.isDone()) {
+            continue;
+          }
+          try {
+            tracedWriteRequest(call);
+          } catch (IOException e) {
+            // exception here means the call has not been added to the pendingCalls yet, so we need
+            // to fail it by our own.
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("call write error for call #" + call.id, e);
+            }
+            call.setException(e);
+            closeConn(e);
+          }
+        }
+      }
+    }
+
+    /**
+     * Cleans the call not yet sent when we finish.
+     */
+    public void cleanup(IOException e) {
+      IOException ie =
+          new ConnectionClosingException("Connection to " + remoteId.address + " is closing.");
+      for (Call call : callsToWrite) {
+        call.setException(ie);
+      }
+      callsToWrite.clear();
+    }
+  }
+
+  BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException {
+    super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,
+        rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);
+    this.rpcClient = rpcClient;
+    if (remoteId.getAddress().isUnresolved()) {
+      throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());
+    }
+
+    this.connectionHeaderPreamble = getConnectionHeaderPreamble();
+    ConnectionHeader header = getConnectionHeader();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4 + header.getSerializedSize());
+    DataOutputStream dos = new DataOutputStream(baos);
+    dos.writeInt(header.getSerializedSize());
+    header.writeTo(dos);
+    assert baos.size() == 4 + header.getSerializedSize();
+    this.connectionHeaderWithLength = baos.getBuffer();
+
+    UserGroupInformation ticket = remoteId.ticket.getUGI();
+    this.threadName = "IPC Client (" + this.rpcClient.socketFactory.hashCode() +
+        ") connection to " + remoteId.getAddress().toString() +
+        ((ticket == null) ? " from an unknown user" : (" from " + ticket.getUserName()));
+
+    if (this.rpcClient.conf.getBoolean(BlockingRpcClient.SPECIFIC_WRITE_THREAD, false)) {
+      callSender = new CallSender(threadName, this.rpcClient.conf);
+      callSender.start();
+    } else {
+      callSender = null;
+    }
+  }
+
+  // protected for write UT.
+  protected void setupConnection() throws IOException {
+    short ioFailures = 0;
+    short timeoutFailures = 0;
+    while (true) {
+      try {
+        this.socket = this.rpcClient.socketFactory.createSocket();
+        this.socket.setTcpNoDelay(this.rpcClient.isTcpNoDelay());
+        this.socket.setKeepAlive(this.rpcClient.tcpKeepAlive);
+        if (this.rpcClient.localAddr != null) {
+          this.socket.bind(this.rpcClient.localAddr);
+        }
+        NetUtils.connect(this.socket, remoteId.getAddress(), this.rpcClient.connectTO);
+        this.socket.setSoTimeout(this.rpcClient.readTO);
+        return;
+      } catch (SocketTimeoutException toe) {
+        /*
+         * The max number of retries is 45, which amounts to 20s*45 = 15 minutes retries.
+         */
+        handleConnectionFailure(timeoutFailures++, this.rpcClient.maxRetries, toe);
+      } catch (IOException ie) {
+        handleConnectionFailure(ioFailures++, this.rpcClient.maxRetries, ie);
+      }
+    }
+  }
+
+  /**
+   * Handle connection failures If the current number of retries is equal to the max number of
+   * retries, stop retrying and throw the exception; Otherwise backoff N seconds and try connecting
+   * again. This Method is only called from inside setupIOstreams(), which is synchronized. Hence
+   * the sleep is synchronized; the locks will be retained.
+   * @param curRetries current number of retries
+   * @param maxRetries max number of retries allowed
+   * @param ioe failure reason
+   * @throws IOException if max number of retries is reached
+   */
+  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
+      throws IOException {
+    closeSocket();
+
+    // throw the exception if the maximum number of retries is reached
+    if (curRetries >= maxRetries || ExceptionUtil.isInterrupt(ioe)) {
+      throw ioe;
+    }
+
+    // otherwise back off and retry
+    try {
+      Thread.sleep(this.rpcClient.failureSleep);
+    } catch (InterruptedException ie) {
+      ExceptionUtil.rethrowIfInterrupt(ie);
+    }
+
+    LOG.info("Retrying connect to server: " + remoteId.getAddress() + " after sleeping " +
+        this.rpcClient.failureSleep + "ms. Already tried " + curRetries + " time(s).");
+  }
+
+  /*
+   * wait till someone signals us to start reading RPC response or it is idle too long, it is marked
+   * as to be closed, or the client is marked as not running.
+   * @return true if it is time to read a response; false otherwise.
+   */
+  private synchronized boolean waitForWork() {
+    // beware of the concurrent access to the calls list: we can add calls, but as well
+    // remove them.
+    long waitUntil = EnvironmentEdgeManager.currentTime() + this.rpcClient.minIdleTimeBeforeClose;
+    for (;;) {
+      if (thread == null) {
+        return false;
+      }
+      if (!calls.isEmpty()) {
+        return true;
+      }
+      if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
+        closeConn(
+          new IOException("idle connection closed with " + calls.size() + " pending request(s)"));
+        return false;
+      }
+      try {
+        wait(Math.min(this.rpcClient.minIdleTimeBeforeClose, 1000));
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
+    }
+    while (waitForWork()) {
+      readResponse();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
+    }
+  }
+
+  private void disposeSasl() {
+    if (saslRpcClient != null) {
+      saslRpcClient.dispose();
+      saslRpcClient = null;
+    }
+  }
+
+  private boolean setupSaslConnection(final InputStream in2, final OutputStream out2)
+      throws IOException {
+    saslRpcClient = new HBaseSaslRpcClient(authMethod, token, serverPrincipal,
+        this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection",
+          QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)));
+    return saslRpcClient.saslConnect(in2, out2);
+  }
+
+  /**
+   * If multiple clients with the same principal try to connect to the same server at the same time,
+   * the server assumes a replay attack is in progress. This is a feature of kerberos. In order to
+   * work around this, what is done is that the client backs off randomly and tries to initiate the
+   * connection again. The other problem is to do with ticket expiry. To handle that, a relogin is
+   * attempted.
+   * <p>
+   * The retry logic is governed by the {@link #shouldAuthenticateOverKrb} method. In case when the
+   * user doesn't have valid credentials, we don't need to retry (from cache or ticket). In such
+   * cases, it is prudent to throw a runtime exception when we receive a SaslException from the
+   * underlying authentication implementation, so there is no retry from other high level (for eg,
+   * HCM or HBaseAdmin).
+   * </p>
+   */
+  private void handleSaslConnectionFailure(final int currRetries, final int maxRetries,
+      final Exception ex, final UserGroupInformation user)
+      throws IOException, InterruptedException {
+    closeSocket();
+    user.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws IOException, InterruptedException {
+        if (shouldAuthenticateOverKrb()) {
+          if (currRetries < maxRetries) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception encountered while connecting to " + "the server : " + ex);
+            }
+            // try re-login
+            relogin();
+            disposeSasl();
+            // have granularity of milliseconds
+            // we are sleeping with the Connection lock held but since this
+            // connection instance is being used for connecting to the server
+            // in question, it is okay
+            Thread.sleep(ThreadLocalRandom.current().nextInt(reloginMaxBackoff) + 1);
+            return null;
+          } else {
+            String msg = "Couldn't setup connection for " +
+                UserGroupInformation.getLoginUser().getUserName() + " to " + serverPrincipal;
+            LOG.warn(msg, ex);
+            throw (IOException) new IOException(msg).initCause(ex);
+          }
+        } else {
+          LOG.warn("Exception encountered while connecting to " + "the server : " + ex);
+        }
+        if (ex instanceof RemoteException) {
+          throw (RemoteException) ex;
+        }
+        if (ex instanceof SaslException) {
+          String msg = "SASL authentication failed." +
+              " The most likely cause is missing or invalid credentials." + " Consider 'kinit'.";
+          LOG.fatal(msg, ex);
+          throw new RuntimeException(msg, ex);
+        }
+        throw new IOException(ex);
+      }
+    });
+  }
+
+  private void setupIOstreams() throws IOException {
+    if (socket != null) {
+      // The connection is already available. Perfect.
+      return;
+    }
+
+    if (this.rpcClient.failedServers.isFailedServer(remoteId.getAddress())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not trying to connect to " + remoteId.address +
+            " this server is in the failed servers list");
+      }
+      throw new FailedServerException(
+          "This server is in the failed servers list: " + remoteId.address);
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Connecting to " + remoteId.address);
+      }
+
+      short numRetries = 0;
+      final short MAX_RETRIES = 5;
+      while (true) {
+        setupConnection();
+        InputStream inStream = NetUtils.getInputStream(socket);
+        // This creates a socket with a write timeout. This timeout cannot be changed.
+        OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO);
+        // Write out the preamble -- MAGIC, version, and auth to use.
+        writeConnectionHeaderPreamble(outStream);
+        if (useSasl) {
+          final InputStream in2 = inStream;
+          final OutputStream out2 = outStream;
+          UserGroupInformation ticket = getUGI();
+          boolean continueSasl;
+          if (ticket == null) {
+            throw new FatalConnectionException("ticket/user is null");
+          }
+          try {
+            continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
+              @Override
+              public Boolean run() throws IOException {
+                return setupSaslConnection(in2, out2);
+              }
+            });
+          } catch (Exception ex) {
+            ExceptionUtil.rethrowIfInterrupt(ex);
+            handleSaslConnectionFailure(numRetries++, MAX_RETRIES, ex, ticket);
+            continue;
+          }
+          if (continueSasl) {
+            // Sasl connect is successful. Let's set up Sasl i/o streams.
+            inStream = saslRpcClient.getInputStream(inStream);
+            outStream = saslRpcClient.getOutputStream(outStream);
+          } else {
+            // fall back to simple auth because server told us so.
+            // do not change authMethod and useSasl here, we should start from secure when
+            // reconnecting because regionserver may change its sasl config after restart.
+          }
+        }
+        this.in = new DataInputStream(new BufferedInputStream(inStream));
+        this.out = new DataOutputStream(new BufferedOutputStream(outStream));
+        // Now write out the connection header
+        writeConnectionHeader();
+        break;
+      }
+    } catch (Throwable t) {
+      closeSocket();
+      IOException e = ExceptionUtil.asInterrupt(t);
+      if (e == null) {
+        this.rpcClient.failedServers.addToFailedServers(remoteId.address);
+        if (t instanceof LinkageError) {
+          // probably the hbase hadoop version does not match the running hadoop version
+          e = new DoNotRetryIOException(t);
+        } else if (t instanceof IOException) {
+          e = (IOException) t;
+        } else {
+          e = new IOException("Could not set up IO Streams to " + remoteId.address, t);
+        }
+      }
+      throw e;
+    }
+
+    // start the receiver thread after the socket connection has been set up
+    thread = new Thread(this, threadName);
+    thread.setDaemon(true);
+    thread.start();
+  }
+
+  /**
+   * Write the RPC header: {@code <MAGIC WORD -- 'HBas'> <ONEBYTE_VERSION> <ONEBYTE_AUTH_TYPE>}
+   */
+  private void writeConnectionHeaderPreamble(OutputStream out) throws IOException {
+    out.write(connectionHeaderPreamble);
+    out.flush();
+  }
+
+  /**
+   * Write the connection header.
+   */
+  private void writeConnectionHeader() throws IOException {
+    this.out.write(connectionHeaderWithLength);
+    this.out.flush();
+  }
+
+  private void tracedWriteRequest(Call call) throws IOException {
+    try (TraceScope ignored = Trace.startSpan("RpcClientImpl.tracedWriteRequest", call.span)) {
+      writeRequest(call);
+    }
+  }
+
+  /**
+   * Initiates a call by sending the parameter to the remote server. Note: this is not called from
+   * the Connection thread, but by other threads.
+   * @see #readResponse()
+   */
+  private void writeRequest(Call call) throws IOException {
+    ByteBuffer cellBlock =
+        this.rpcClient.cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells);
+    CellBlockMeta cellBlockMeta;
+    if (cellBlock != null) {
+      cellBlockMeta = CellBlockMeta.newBuilder().setLength(cellBlock.limit()).build();
+    } else {
+      cellBlockMeta = null;
+    }
+    RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta);
+
+    setupIOstreams();
+
+    // Now we're going to write the call. We take the lock, then check that the connection
+    // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+    // know where we stand, we have to close the connection.
+    if (Thread.interrupted()) {
+      throw new InterruptedIOException();
+    }
+
+    calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+    // from here, we do not throw any exception to upper layer as the call has been tracked in the
+    // pending calls map.
+    try {
+      call.callStats.setRequestSizeBytes(write(this.out, requestHeader, call.param, cellBlock));
+    } catch (IOException e) {
+      closeConn(e);
+      return;
+    }
+    notifyAll();
+  }
+
+  /*
+   * Receive a response. Because only one receiver, so no synchronization on in.
+   */
+  private void readResponse() {
+    Call call = null;
+    boolean expectedCall = false;
+    try {
+      // See HBaseServer.Call.setResponse for where we write out the response.
+      // Total size of the response. Unused. But have to read it in anyways.
+      int totalSize = in.readInt();
+
+      // Read the header
+      ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
+      int id = responseHeader.getCallId();
+      call = calls.remove(id); // call.done have to be set before leaving this method
+      expectedCall = (call != null && !call.isDone());
+      if (!expectedCall) {
+        // So we got a response for which we have no corresponding 'call' here on the client-side.
+        // We probably timed out waiting, cleaned up all references, and now the server decides
+        // to return a response. There is nothing we can do w/ the response at this stage. Clean
+        // out the wire of the response so its out of the way and we can get other responses on
+        // this connection.
+        int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader);
+        int whatIsLeftToRead = totalSize - readSoFar;
+        IOUtils.skipFully(in, whatIsLeftToRead);
+        if (call != null) {
+          call.callStats.setResponseSizeBytes(totalSize);
+          call.callStats
+              .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+        }
+        return;
+      }
+      if (responseHeader.hasException()) {
+        ExceptionResponse exceptionResponse = responseHeader.getException();
+        RemoteException re = createRemoteException(exceptionResponse);
+        call.setException(re);
+        call.callStats.setResponseSizeBytes(totalSize);
+        call.callStats
+            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+        if (isFatalConnectionException(exceptionResponse)) {
+          synchronized (this) {
+            closeConn(re);
+          }
+        }
+      } else {
+        Message value = null;
+        if (call.responseDefaultType != null) {
+          Builder builder = call.responseDefaultType.newBuilderForType();
+          ProtobufUtil.mergeDelimitedFrom(builder, in);
+          value = builder.build();
+        }
+        CellScanner cellBlockScanner = null;
+        if (responseHeader.hasCellBlockMeta()) {
+          int size = responseHeader.getCellBlockMeta().getLength();
+          byte[] cellBlock = new byte[size];
+          IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
+          cellBlockScanner = this.rpcClient.cellBlockBuilder.createCellScanner(this.codec,
+            this.compressor, cellBlock);
+        }
+        call.setResponse(value, cellBlockScanner);
+        call.callStats.setResponseSizeBytes(totalSize);
+        call.callStats
+            .setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+      }
+    } catch (IOException e) {
+      if (expectedCall) {
+        call.setException(e);
+      }
+      if (e instanceof SocketTimeoutException) {
+        // Clean up open calls but don't treat this as a fatal condition,
+        // since we expect certain responses to not make it by the specified
+        // {@link ConnectionId#rpcTimeout}.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("ignored", e);
+        }
+      } else {
+        synchronized (this) {
+          closeConn(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected synchronized void callTimeout(Call call) {
+    // call sender
+    calls.remove(call.id);
+  }
+
+  // just close socket input and output.
+  private void closeSocket() {
+    IOUtils.closeStream(out);
+    IOUtils.closeStream(in);
+    IOUtils.closeSocket(socket);
+    out = null;
+    in = null;
+    socket = null;
+  }
+
+  // close socket, reader, and clean up all pending calls.
+  private void closeConn(IOException e) {
+    if (thread == null) {
+      return;
+    }
+    thread.interrupt();
+    thread = null;
+    closeSocket();
+    if (callSender != null) {
+      callSender.cleanup(e);
+    }
+    for (Call call : calls.values()) {
+      call.setException(e);
+    }
+    calls.clear();
+  }
+
+  // release all resources, the connection will not be used any more.
+  @Override
+  public synchronized void shutdown() {
+    closed = true;
+    if (callSender != null) {
+      callSender.interrupt();
+    }
+    closeConn(new IOException("connection to " + remoteId.address + " closed"));
+  }
+
+  @Override
+  public void cleanupConnection() {
+    // do nothing
+  }
+
+  @Override
+  public synchronized void sendRequest(final Call call, HBaseRpcController pcrc)
+      throws IOException {
+    pcrc.notifyOnCancel(new RpcCallback<Object>() {
+
+      @Override
+      public void run(Object parameter) {
+        setCancelled(call);
+        synchronized (BlockingRpcConnection.this) {
+          if (callSender != null) {
+            callSender.remove(call);
+          } else {
+            calls.remove(call.id);
+          }
+        }
+      }
+    }, new CancellationCallback() {
+
+      @Override
+      public void run(boolean cancelled) throws IOException {
+        if (cancelled) {
+          setCancelled(call);
+          return;
+        }
+        scheduleTimeoutTask(call);
+        if (callSender != null) {
+          callSender.sendCall(call);
+        } else {
+          tracedWriteRequest(call);
+        }
+      }
+    });
+  }
+
+  @Override
+  public synchronized boolean isActive() {
+    return thread != null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
new file mode 100644
index 0000000..c628c31
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * We will expose the connection to upper layer before initialized, so we need to buffer the calls
+ * passed in and write them out once the connection is established.
+ */
+@InterfaceAudience.Private
+class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
+
+  private enum BufferCallAction {
+    FLUSH, FAIL
+  }
+
+  public static final class BufferCallEvent {
+
+    public final BufferCallAction action;
+
+    public final IOException error;
+
+    private BufferCallEvent(BufferCallBeforeInitHandler.BufferCallAction action,
+        IOException error) {
+      this.action = action;
+      this.error = error;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent success() {
+      return SUCCESS_EVENT;
+    }
+
+    public static BufferCallBeforeInitHandler.BufferCallEvent fail(IOException error) {
+      return new BufferCallEvent(BufferCallAction.FAIL, error);
+    }
+  }
+
+  private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
+      null);
+
+  private final Map<Integer, Call> id2Call = new HashMap<>();
+
+  @Override
+  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
+    if (msg instanceof Call) {
+      Call call = (Call) msg;
+      id2Call.put(call.id, call);
+      // The call is already in track so here we set the write operation as success.
+      // We will fail the call directly if we can not write it out.
+      promise.trySuccess();
+    } else {
+      ctx.write(msg, promise);
+    }
+  }
+
+  @Override
+  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+    if (evt instanceof BufferCallEvent) {
+      BufferCallEvent bcEvt = (BufferCallBeforeInitHandler.BufferCallEvent) evt;
+      switch (bcEvt.action) {
+        case FLUSH:
+          for (Call call : id2Call.values()) {
+            ctx.write(call);
+          }
+          break;
+        case FAIL:
+          for (Call call : id2Call.values()) {
+            call.setException(bcEvt.error);
+          }
+          break;
+      }
+      ctx.flush();
+      ctx.pipeline().remove(this);
+    } else if (evt instanceof CallEvent) {
+      // just remove the call for now until we add other call event other than timeout and cancel.
+      id2Call.remove(((CallEvent) evt).call.id);
+    } else {
+      ctx.fireUserEventTriggered(evt);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 5f90837..a6203d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -19,36 +19,50 @@ package org.apache.hadoop.hbase.ipc;
 
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+
+import io.netty.util.Timeout;
+
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-import java.io.IOException;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
 
 /** A call waiting for a value. */
 @InterfaceAudience.Private
-public class Call {
-  final int id;                                 // call id
-  final Message param;                          // rpc request method param object
+class Call {
+  final int id; // call id
+  final Message param; // rpc request method param object
   /**
-   * Optionally has cells when making call.  Optionally has cells set on response.  Used
-   * passing cells to the rpc and receiving the response.
+   * Optionally has cells when making call. Optionally has cells set on response. Used passing cells
+   * to the rpc and receiving the response.
    */
   CellScanner cells;
-  Message response;                             // value, null if error
-  // The return type.  Used to create shell into which we deserialize the response if any.
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "Direct access is only allowed after done")
+  Message response; // value, null if error
+  // The return type. Used to create shell into which we deserialize the response if any.
   Message responseDefaultType;
-  IOException error;                            // exception, null if value
-  volatile boolean done;                                 // true when call is done
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+    justification = "Direct access is only allowed after done")
+  IOException error; // exception, null if value
+  private boolean done; // true when call is done
   final Descriptors.MethodDescriptor md;
   final int timeout; // timeout in millisecond for this call; 0 means infinite.
+  final int priority;
   final MetricsConnection.CallStats callStats;
+  final RpcCallback<Call> callback;
+  final Span span;
+  Timeout timeoutTask;
 
   protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
-      final CellScanner cells, final Message responseDefaultType, int timeout,
-      MetricsConnection.CallStats callStats) {
+      final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
+      RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
     this.param = param;
     this.md = md;
     this.cells = cells;
@@ -57,73 +71,74 @@ public class Call {
     this.responseDefaultType = responseDefaultType;
     this.id = id;
     this.timeout = timeout;
+    this.priority = priority;
+    this.callback = callback;
+    this.span = Trace.currentSpan();
+  }
+
+  @Override
+  public String toString() {
+    return "callId: " + this.id + " methodName: " + this.md.getName() + " param {"
+        + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
   }
 
   /**
-   * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
-   * @return true if the call is on timeout, false otherwise.
+   * called from timeoutTask, prevent self cancel
    */
-  public boolean checkAndSetTimeout() {
-    if (timeout == 0){
-      return false;
-    }
-
-    long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
-    if (waitTime >= timeout) {
-      IOException ie = new CallTimeoutException("Call id=" + id +
-          ", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
-      setException(ie); // includes a notify
-      return true;
-    } else {
-      return false;
+  public void setTimeout(IOException error) {
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.error = error;
     }
+    callback.run(this);
   }
 
-  public int remainingTime() {
-    if (timeout == 0) {
-      return Integer.MAX_VALUE;
+  private void callComplete() {
+    if (timeoutTask != null) {
+      timeoutTask.cancel();
     }
-
-    int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
-    return remaining > 0 ? remaining : 0;
+    callback.run(this);
   }
 
-  @Override
-  public String toString() {
-    return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
-      (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
-  }
-
-  /** Indicate when the call is complete and the
-   * value or error are available.  Notifies by default.  */
-  protected synchronized void callComplete() {
-    this.done = true;
-    notify();                                 // notify caller
-  }
-
-  /** Set the exception when there is an error.
-   * Notify the caller the call is done.
-   *
+  /**
+   * Set the exception when there is an error. Notify the caller the call is done.
    * @param error exception thrown by the call; either local or remote
    */
   public void setException(IOException error) {
-    this.error = error;
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.error = error;
+    }
     callComplete();
   }
 
   /**
-   * Set the return value when there is no error.
-   * Notify the caller the call is done.
-   *
+   * Set the return value when there is no error. Notify the caller the call is done.
    * @param response return value of the call.
    * @param cells Can be null
    */
   public void setResponse(Message response, final CellScanner cells) {
-    this.response = response;
-    this.cells = cells;
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      this.done = true;
+      this.response = response;
+      this.cells = cells;
+    }
     callComplete();
   }
 
+  public synchronized boolean isDone() {
+    return done;
+  }
+
   public long getStartTime() {
     return this.callStats.getStartTime();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
new file mode 100644
index 0000000..a6777c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallCancelledException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Client side call cancelled.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CallCancelledException extends HBaseIOException {
+
+  private static final long serialVersionUID = 309775809470318208L;
+
+  public CallCancelledException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
new file mode 100644
index 0000000..1c2ea32
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CallEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to tell netty handler the call is cancelled, timeout...
+ */
+@InterfaceAudience.Private
+class CallEvent {
+
+  public enum Type {
+    TIMEOUT, CANCELLED
+  }
+
+  final Type type;
+
+  final Call call;
+
+  CallEvent(Type type, Call call) {
+    this.type = type;
+    this.call = call;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
new file mode 100644
index 0000000..0dac2d1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Helper class for building cell block.
+ */
+@InterfaceAudience.Private
+class CellBlockBuilder {
+
+  // LOG is being used in TestCellBlockBuilder
+  static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
+
+  private final Configuration conf;
+
+  /**
+   * How much we think the decompressor will expand the original compressed content.
+   */
+  private final int cellBlockDecompressionMultiplier;
+
+  private final int cellBlockBuildingInitialBufferSize;
+
+  public CellBlockBuilder(Configuration conf) {
+    this.conf = conf;
+    this.cellBlockDecompressionMultiplier =
+        conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
+    // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
+    // #buildCellBlock.
+    this.cellBlockBuildingInitialBufferSize =
+        ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
+  }
+
+  private interface OutputStreamSupplier {
+
+    OutputStream get(int expectedSize);
+
+    int size();
+  }
+
+  private static final class ByteBufferOutputStreamSupplier implements OutputStreamSupplier {
+
+    private ByteBufferOutputStream baos;
+
+    @Override
+    public OutputStream get(int expectedSize) {
+      baos = new ByteBufferOutputStream(expectedSize);
+      return baos;
+    }
+
+    @Override
+    public int size() {
+      return baos.size();
+    }
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
+   * @param codec
+   * @param compressor
+   * @param cellScanner
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+   *         been flipped and is ready for reading. Use limit to find total size.
+   * @throws IOException
+   */
+  public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+      final CellScanner cellScanner) throws IOException {
+    ByteBufferOutputStreamSupplier supplier = new ByteBufferOutputStreamSupplier();
+    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+      ByteBuffer bb = supplier.baos.getByteBuffer();
+      // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+      // gets or something -- stuff that does not return a cell).
+      return bb.hasRemaining() ? bb : null;
+    } else {
+      return null;
+    }
+  }
+
+  private static final class ByteBufOutputStreamSupplier implements OutputStreamSupplier {
+
+    private final ByteBufAllocator alloc;
+
+    private ByteBuf buf;
+
+    public ByteBufOutputStreamSupplier(ByteBufAllocator alloc) {
+      this.alloc = alloc;
+    }
+
+    @Override
+    public OutputStream get(int expectedSize) {
+      buf = alloc.buffer(expectedSize);
+      return new ByteBufOutputStream(buf);
+    }
+
+    @Override
+    public int size() {
+      return buf.writerIndex();
+    }
+  }
+
+  public ByteBuf buildCellBlock(Codec codec, CompressionCodec compressor, CellScanner cellScanner,
+      ByteBufAllocator alloc) throws IOException {
+    ByteBufOutputStreamSupplier supplier = new ByteBufOutputStreamSupplier(alloc);
+    if (buildCellBlock(codec, compressor, cellScanner, supplier)) {
+      return supplier.buf;
+    } else {
+      return null;
+    }
+  }
+
+  private boolean buildCellBlock(final Codec codec, final CompressionCodec compressor,
+      final CellScanner cellScanner, OutputStreamSupplier supplier) throws IOException {
+    if (cellScanner == null) {
+      return false;
+    }
+    if (codec == null) {
+      throw new CellScannerButNoCodecException();
+    }
+    int bufferSize = cellBlockBuildingInitialBufferSize;
+    encodeCellsTo(supplier.get(bufferSize), cellScanner, codec, compressor);
+    if (LOG.isTraceEnabled() && bufferSize < supplier.size()) {
+      LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + supplier.size() +
+          "; up hbase.ipc.cellblock.building.initial.buffersize?");
+    }
+    return true;
+  }
+
+  private void encodeCellsTo(OutputStream os, CellScanner cellScanner, Codec codec,
+      CompressionCodec compressor) throws IOException {
+    Compressor poolCompressor = null;
+    try {
+      if (compressor != null) {
+        if (compressor instanceof Configurable) {
+          ((Configurable) compressor).setConf(this.conf);
+        }
+        poolCompressor = CodecPool.getCompressor(compressor);
+        os = compressor.createOutputStream(os, poolCompressor);
+      }
+      Codec.Encoder encoder = codec.getEncoder(os);
+      while (cellScanner.advance()) {
+        encoder.write(cellScanner.current());
+      }
+      encoder.flush();
+    } catch (BufferOverflowException | IndexOutOfBoundsException e) {
+      throw new DoNotRetryIOException(e);
+    } finally {
+      os.close();
+      if (poolCompressor != null) {
+        CodecPool.returnCompressor(poolCompressor);
+      }
+    }
+  }
+
+  /**
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
+   * @param codec
+   * @param compressor
+   * @param cellScanner
+   * @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate our own
+   *          ByteBuffer.
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   *         passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+   *         been flipped and is ready for reading. Use limit to find total size. If
+   *         <code>pool</code> was not null, then this returned ByteBuffer came from there and
+   *         should be returned to the pool when done.
+   * @throws IOException
+   */
+  public ByteBuffer buildCellBlock(Codec codec, CompressionCodec compressor,
+      CellScanner cellScanner, BoundedByteBufferPool pool) throws IOException {
+    if (cellScanner == null) {
+      return null;
+    }
+    if (codec == null) {
+      throw new CellScannerButNoCodecException();
+    }
+    ByteBufferOutputStream bbos;
+    ByteBuffer bb = null;
+    if (pool != null) {
+      bb = pool.getBuffer();
+      bbos = new ByteBufferOutputStream(bb);
+    } else {
+      bbos = new ByteBufferOutputStream(cellBlockBuildingInitialBufferSize);
+    }
+    encodeCellsTo(bbos, cellScanner, codec, compressor);
+    if (bbos.size() == 0) {
+      if (pool != null) {
+        pool.putBuffer(bb);
+      }
+      return null;
+    }
+    return bbos.getByteBuffer();
+  }
+
+  /**
+   * @param codec to use for cellblock
+   * @param cellBlock to encode
+   * @return CellScanner to work against the content of <code>cellBlock</code>
+   * @throws IOException if encoding fails
+   */
+  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+      final byte[] cellBlock) throws IOException {
+    return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
+  }
+
+  /**
+   * @param codec
+   * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+   *          position()'ed at the start of the cell block and limit()'ed at the end.
+   * @return CellScanner to work against the content of <code>cellBlock</code>
+   * @throws IOException
+   */
+  public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+      ByteBuffer cellBlock) throws IOException {
+    if (compressor != null) {
+      cellBlock = decompress(compressor, cellBlock);
+    }
+    // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
+    // make Cells directly over the passed BB. This method is called at client side and we don't
+    // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
+    // of the Cells at user's app level will make it not possible to GC the response byte[]
+    return codec.getDecoder(new ByteBufferInputStream(cellBlock));
+  }
+
+  private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+      throws IOException {
+    // GZIPCodec fails w/ NPE if no configuration.
+    if (compressor instanceof Configurable) {
+      ((Configurable) compressor).setConf(this.conf);
+    }
+    Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+    CompressionInputStream cis =
+        compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
+    ByteBufferOutputStream bbos;
+    try {
+      // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
+      // TODO: Reuse buffers.
+      bbos =
+          new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+      IOUtils.copy(cis, bbos);
+      bbos.close();
+      cellBlock = bbos.getByteBuffer();
+    } finally {
+      CodecPool.returnDecompressor(poolDecompressor);
+    }
+    return cellBlock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
new file mode 100644
index 0000000..ffd27b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown if a cellscanner but no codec to encode it with.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CellScannerButNoCodecException extends HBaseIOException {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
new file mode 100644
index 0000000..f710d54
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DefaultNettyEventLoopConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * The default netty event loop config
+ */
+@InterfaceAudience.Private
+class DefaultNettyEventLoopConfig {
+
+  public static final Pair<EventLoopGroup, Class<? extends Channel>> GROUP_AND_CHANNEL_CLASS = Pair
+      .<EventLoopGroup, Class<? extends Channel>> newPair(
+        new NioEventLoopGroup(0,
+            new DefaultThreadFactory("Default-IPC-NioEventLoopGroup", true, Thread.MAX_PRIORITY)),
+        NioSocketChannel.class);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
new file mode 100644
index 0000000..aaaea1f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
+ * standard behavior of a {@link HBaseRpcController}. Used testing.
+ */
+@InterfaceAudience.Private
+public class DelegatingHBaseRpcController implements HBaseRpcController {
+
+  private final HBaseRpcController delegate;
+
+  public DelegatingHBaseRpcController(HBaseRpcController delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void reset() {
+    delegate.reset();
+  }
+
+  @Override
+  public boolean failed() {
+    return delegate.failed();
+  }
+
+  @Override
+  public String errorText() {
+    return delegate.errorText();
+  }
+
+  @Override
+  public void startCancel() {
+    delegate.startCancel();
+  }
+
+  @Override
+  public void setFailed(String reason) {
+    delegate.setFailed(reason);
+  }
+
+  @Override
+  public boolean isCanceled() {
+    return delegate.isCanceled();
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> callback) {
+    delegate.notifyOnCancel(callback);
+  }
+
+  @Override
+  public CellScanner cellScanner() {
+    return delegate.cellScanner();
+  }
+
+  @Override
+  public void setCellScanner(CellScanner cellScanner) {
+    delegate.setCellScanner(cellScanner);
+  }
+
+  @Override
+  public void setPriority(int priority) {
+    delegate.setPriority(priority);
+  }
+
+  @Override
+  public void setPriority(TableName tn) {
+    delegate.setPriority(tn);
+  }
+
+  @Override
+  public int getPriority() {
+    return delegate.getPriority();
+  }
+
+  @Override
+  public int getCallTimeout() {
+    return delegate.getCallTimeout();
+  }
+
+  @Override
+  public void setCallTimeout(int callTimeout) {
+    delegate.setCallTimeout(callTimeout);
+  }
+
+  @Override
+  public boolean hasCallTimeout() {
+    return delegate.hasCallTimeout();
+  }
+
+  @Override
+  public void setFailed(IOException e) {
+    delegate.setFailed(e);
+  }
+
+  @Override
+  public IOException getFailed() {
+    return delegate.getFailed();
+  }
+
+  @Override
+  public void setDone(CellScanner cellScanner) {
+    delegate.setDone(cellScanner);
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
+      throws IOException {
+    delegate.notifyOnCancel(callback, action);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
deleted file mode 100644
index ad4224b..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingPayloadCarryingRpcController.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Simple delegating controller for use with the {@link RpcControllerFactory} to help override
- * standard behavior of a {@link PayloadCarryingRpcController}.
- */
-@InterfaceAudience.Private
-public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
-  private PayloadCarryingRpcController delegate;
-
-  public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
-    this.delegate = delegate;
-  }
-
-  @Override
-  public CellScanner cellScanner() {
-    return delegate.cellScanner();
-  }
-
-  @Override
-  public void setCellScanner(final CellScanner cellScanner) {
-    delegate.setCellScanner(cellScanner);
-  }
-
-  @Override
-  public void setPriority(int priority) {
-    delegate.setPriority(priority);
-  }
-
-  @Override
-  public void setPriority(final TableName tn) {
-    delegate.setPriority(tn);
-  }
-
-  @Override
-  public int getPriority() {
-    return delegate.getPriority();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
new file mode 100644
index 0000000..721148b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/FallbackDisallowedException.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Indicate that the rpc server tells client to fallback to simple auth but client is disabled to do
+ * so.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FallbackDisallowedException extends HBaseIOException {
+
+  private static final long serialVersionUID = -6942845066279358253L;
+
+  public FallbackDisallowedException() {
+    super("Server asks us to fall back to SIMPLE auth, "
+        + "but this client is configured to only allow secure connections.");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
new file mode 100644
index 0000000..2c4b335
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
+ * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
+ * having to protobuf them (for performance reasons). This class is used ferrying data across the
+ * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public interface HBaseRpcController extends RpcController, CellScannable {
+
+  static final int PRIORITY_UNSET = -1;
+
+  /**
+   * Only used to send cells to rpc server, the returned cells should be set by
+   * {@link #setDone(CellScanner)}.
+   */
+  void setCellScanner(CellScanner cellScanner);
+
+  /**
+   * @param priority Priority for this request; should fall roughly in the range
+   *          {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS}
+   */
+  void setPriority(int priority);
+
+  /**
+   * @param tn Set priority based off the table we are going against.
+   */
+  void setPriority(final TableName tn);
+
+  /**
+   * @return The priority of this request
+   */
+  int getPriority();
+
+  int getCallTimeout();
+
+  void setCallTimeout(int callTimeout);
+
+  boolean hasCallTimeout();
+
+  /**
+   * Set failed with an exception to pass on. For use in async rpc clients
+   * @param e exception to set with
+   */
+  void setFailed(IOException e);
+
+  /**
+   * Return the failed exception, null if not failed.
+   */
+  IOException getFailed();
+
+  /**
+   * <b>IMPORTANT:</b> always call this method if the call finished without any exception to tell
+   * the {@code HBaseRpcController} that we are done.
+   */
+  void setDone(CellScanner cellScanner);
+
+  /**
+   * A little different from the basic RpcController:
+   * <ol>
+   * <li>You can register multiple callbacks to an {@code HBaseRpcController}.</li>
+   * <li>The callback will not be called if the rpc call is finished without any cancellation.</li>
+   * <li>You can call me at client side also.</li>
+   * </ol>
+   */
+  @Override
+  void notifyOnCancel(RpcCallback<Object> callback);
+
+  interface CancellationCallback {
+    void run(boolean cancelled) throws IOException;
+  }
+
+  /**
+   * If not cancelled, add the callback to cancellation callback list. And then execute the action
+   * with the cancellation state as a parameter. The implementation should guarantee that the
+   * cancellation state does not change during this call.
+   */
+  void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/094e9a31/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
new file mode 100644
index 0000000..a976473
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.RpcCallback;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Optionally carries Cells across the proxy/service interface down into ipc. On its way out it
+ * optionally carries a set of result Cell data. We stick the Cells here when we want to avoid
+ * having to protobuf them (for performance reasons). This class is used ferrying data across the
+ * proxy/protobuf service chasm. Also does call timeout. Used by client and server ipc'ing.
+ */
+@InterfaceAudience.Private
+public class HBaseRpcControllerImpl implements HBaseRpcController {
+  /**
+   * The time, in ms before the call should expire.
+   */
+  private Integer callTimeout;
+
+  private boolean done = false;
+
+  private boolean cancelled = false;
+
+  private final List<RpcCallback<Object>> cancellationCbs = new ArrayList<>();
+
+  private IOException exception;
+
+  /**
+   * Priority to set on this request. Set it here in controller so available composing the request.
+   * This is the ordained way of setting priorities going forward. We will be undoing the old
+   * annotation-based mechanism.
+   */
+  private int priority = PRIORITY_UNSET;
+
+  /**
+   * They are optionally set on construction, cleared after we make the call, and then optionally
+   * set on response with the result. We use this lowest common denominator access to Cells because
+   * sometimes the scanner is backed by a List of Cells and other times, it is backed by an encoded
+   * block that implements CellScanner.
+   */
+  private CellScanner cellScanner;
+
+  public HBaseRpcControllerImpl() {
+    this((CellScanner) null);
+  }
+
+  public HBaseRpcControllerImpl(final CellScanner cellScanner) {
+    this.cellScanner = cellScanner;
+  }
+
+  public HBaseRpcControllerImpl(final List<CellScannable> cellIterables) {
+    this.cellScanner = cellIterables == null ? null : CellUtil.createCellScanner(cellIterables);
+  }
+
+  /**
+   * @return One-shot cell scanner (you cannot back it up and restart)
+   */
+  @Override
+  public CellScanner cellScanner() {
+    return cellScanner;
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "The only possible race method is startCancel")
+  @Override
+  public void setCellScanner(final CellScanner cellScanner) {
+    this.cellScanner = cellScanner;
+  }
+
+  @Override
+  public void setPriority(int priority) {
+    this.priority = priority;
+  }
+
+  @Override
+  public void setPriority(final TableName tn) {
+    setPriority(
+      tn != null && tn.isSystemTable() ? HConstants.SYSTEMTABLE_QOS : HConstants.NORMAL_QOS);
+  }
+
+  @Override
+  public int getPriority() {
+    return priority;
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+      justification = "The only possible race method is startCancel")
+  @Override
+  public void reset() {
+    priority = 0;
+    cellScanner = null;
+    exception = null;
+    callTimeout = null;
+    // In the implementations of some callable with replicas, rpc calls are executed in a executor
+    // and we could cancel the operation from outside which means there could be a race between
+    // reset and startCancel. Although I think the race should be handled by the callable since the
+    // reset may clear the cancel state...
+    synchronized (this) {
+      done = false;
+      cancelled = false;
+      cancellationCbs.clear();
+    }
+  }
+
+  @Override
+  public int getCallTimeout() {
+    if (callTimeout != null) {
+      return callTimeout.intValue();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void setCallTimeout(int callTimeout) {
+    this.callTimeout = callTimeout;
+  }
+
+  @Override
+  public boolean hasCallTimeout() {
+    return callTimeout != null;
+  }
+
+  @Override
+  public synchronized String errorText() {
+    if (!done || exception == null) {
+      return null;
+    }
+    return exception.getMessage();
+  }
+
+  @Override
+  public synchronized boolean failed() {
+    return done && this.exception != null;
+  }
+
+  @Override
+  public synchronized boolean isCanceled() {
+    return cancelled;
+  }
+
+  @Override
+  public void notifyOnCancel(RpcCallback<Object> callback) {
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      if (!cancelled) {
+        cancellationCbs.add(callback);
+        return;
+      }
+    }
+    // run it directly as we have already been cancelled.
+    callback.run(null);
+  }
+
+  @Override
+  public synchronized void setFailed(String reason) {
+    if (done) {
+      return;
+    }
+    done = true;
+    exception = new IOException(reason);
+  }
+
+  @Override
+  public synchronized void setFailed(IOException e) {
+    if (done) {
+      return;
+    }
+    done = true;
+    exception = e;
+  }
+
+  @Override
+  public synchronized IOException getFailed() {
+    return done ? exception : null;
+  }
+
+  @Override
+  public synchronized void setDone(CellScanner cellScanner) {
+    if (done) {
+      return;
+    }
+    done = true;
+    this.cellScanner = cellScanner;
+  }
+
+  @Override
+  public void startCancel() {
+    // As said above in the comment of reset, the cancellationCbs maybe cleared by reset, so we need
+    // to copy it.
+    List<RpcCallback<Object>> cbs;
+    synchronized (this) {
+      if (done) {
+        return;
+      }
+      done = true;
+      cancelled = true;
+      cbs = new ArrayList<>(cancellationCbs);
+    }
+    for (RpcCallback<?> cb : cbs) {
+      cb.run(null);
+    }
+  }
+
+  @Override
+  public synchronized void notifyOnCancel(RpcCallback<Object> callback, CancellationCallback action)
+      throws IOException {
+    if (cancelled) {
+      action.run(true);
+    } else {
+      cancellationCbs.add(callback);
+      action.run(false);
+    }
+  }
+
+}
\ No newline at end of file