You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2015/02/16 18:04:47 UTC

hbase git commit: HBASE-13011 TestLoadIncrementalHFiles is flakey when using AsyncRpcClient as client implementation Added comment to AsyncRpcChannel data members

Repository: hbase
Updated Branches:
  refs/heads/master 6d72a993e -> e99091e97


HBASE-13011 TestLoadIncrementalHFiles is flakey when using AsyncRpcClient as client implementation
Added comment to AsyncRpcChannel data members

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e99091e9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e99091e9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e99091e9

Branch: refs/heads/master
Commit: e99091e97c374a026ba690cf9d6a4592f3c3fbdd
Parents: 6d72a99
Author: zhangduo <zh...@wandoujia.com>
Authored: Sat Feb 14 08:36:38 2015 +0800
Committer: stack <st...@apache.org>
Committed: Mon Feb 16 09:03:31 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |   9 +-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       | 300 ++++++++-----------
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |  59 ++--
 .../hbase/ipc/AsyncServerResponseHandler.java   |  16 +-
 4 files changed, 180 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index c35238c..68a494d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -72,7 +72,7 @@ public class AsyncCall extends DefaultPromise<Message> {
     this.responseDefaultType = responseDefaultType;
 
     this.startTime = EnvironmentEdgeManager.currentTime();
-    this.rpcTimeout = controller.getCallTimeout();
+    this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
   }
 
   /**
@@ -84,9 +84,10 @@ public class AsyncCall extends DefaultPromise<Message> {
     return this.startTime;
   }
 
-  @Override public String toString() {
-    return "callId: " + this.id + " methodName: " + this.method.getName() + " param {" +
-        (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
+  @Override
+  public String toString() {
+    return "callId: " + this.id + " methodName: " + this.method.getName() + " param {"
+        + (this.param != null ? ProtobufUtil.getShortTextFormat(this.param) : "") + "}";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 054c9b5..ffb2dcf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
@@ -31,6 +28,23 @@ import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.security.sasl.SaslException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
@@ -56,18 +70,9 @@ import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 
-import javax.security.sasl.SaslException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.TimeUnit;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
 
 /**
  * Netty RPC channel
@@ -97,8 +102,6 @@ public class AsyncRpcChannel {
   final String serviceName;
   final InetSocketAddress address;
 
-  ConcurrentSkipListMap<Integer, AsyncCall> calls = new ConcurrentSkipListMap<>();
-
   private int ioFailureCounter = 0;
   private int connectFailureCounter = 0;
 
@@ -108,15 +111,18 @@ public class AsyncRpcChannel {
   private Token<? extends TokenIdentifier> token;
   private String serverPrincipal;
 
-  volatile boolean shouldCloseConnection = false;
-  private IOException closeException;
+
+  // NOTE: closed and connected flags below are only changed when a lock on pendingCalls
+  private final Map<Integer, AsyncCall> pendingCalls = new HashMap<Integer, AsyncCall>();
+  private boolean connected = false;
+  private boolean closed = false;
 
   private Timeout cleanupTimer;
 
   private final TimerTask timeoutTask = new TimerTask() {
-    @Override public void run(Timeout timeout) throws Exception {
-      cleanupTimer = null;
-      cleanupCalls(false);
+    @Override
+    public void run(Timeout timeout) throws Exception {
+      cleanupCalls();
     }
   };
 
@@ -213,15 +219,20 @@ public class AsyncRpcChannel {
     ch.pipeline()
         .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
     ch.pipeline().addLast(new AsyncServerResponseHandler(this));
-
     try {
       writeChannelHeader(ch).addListener(new GenericFutureListener<ChannelFuture>() {
-        @Override public void operationComplete(ChannelFuture future) throws Exception {
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
           if (!future.isSuccess()) {
             close(future.cause());
             return;
           }
-          for (AsyncCall call : calls.values()) {
+          List<AsyncCall> callsToWrite;
+          synchronized (pendingCalls) {
+            connected = true;
+            callsToWrite = new ArrayList<AsyncCall>(pendingCalls.values());
+          }
+          for (AsyncCall call : callsToWrite) {
             writeRequest(call);
           }
         }
@@ -240,17 +251,18 @@ public class AsyncRpcChannel {
    */
   private SaslClientHandler getSaslHandler(final Bootstrap bootstrap) throws IOException {
     return new SaslClientHandler(authMethod, token, serverPrincipal, client.fallbackAllowed,
-        client.conf.get("hbase.rpc.protection",
-            SaslUtil.QualityOfProtection.AUTHENTICATION.name().toLowerCase()),
-        new SaslClientHandler.SaslExceptionHandler() {
-          @Override public void handle(int retryCount, Random random, Throwable cause) {
+        client.conf.get("hbase.rpc.protection", SaslUtil.QualityOfProtection.AUTHENTICATION.name()
+            .toLowerCase()), new SaslClientHandler.SaslExceptionHandler() {
+          @Override
+          public void handle(int retryCount, Random random, Throwable cause) {
             try {
               // Handle Sasl failure. Try to potentially get new credentials
               handleSaslConnectionFailure(retryCount, cause, ticket.getUGI());
 
               // Try to reconnect
               AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() {
-                @Override public void run(Timeout timeout) throws Exception {
+                @Override
+                public void run(Timeout timeout) throws Exception {
                   connect(bootstrap);
                 }
               }, random.nextInt(reloginMaxBackoff) + 1, TimeUnit.MILLISECONDS);
@@ -259,10 +271,11 @@ public class AsyncRpcChannel {
             }
           }
         }, new SaslClientHandler.SaslSuccessfulConnectHandler() {
-      @Override public void onSuccess(Channel channel) {
-        startHBaseConnection(channel);
-      }
-    });
+          @Override
+          public void onSuccess(Channel channel) {
+            startHBaseConnection(channel);
+          }
+        });
   }
 
   /**
@@ -295,66 +308,50 @@ public class AsyncRpcChannel {
   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
       final PayloadCarryingRpcController controller, final Message request,
       final Message responsePrototype) {
-    if (shouldCloseConnection) {
-      Promise<Message> promise = channel.eventLoop().newPromise();
-      promise.setFailure(new ConnectException());
-      return promise;
-    }
-
-    final AsyncCall call = new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(),
-        method, request, controller, responsePrototype);
-
+    final AsyncCall call =
+        new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
+            controller, responsePrototype);
     controller.notifyOnCancel(new RpcCallback<Object>() {
       @Override
       public void run(Object parameter) {
-        calls.remove(call.id);
+        // TODO: do not need to call AsyncCall.setFailed?
+        synchronized (pendingCalls) {
+          pendingCalls.remove(call.id);
+        }
       }
     });
+    // TODO: this should be handled by PayloadCarryingRpcController.
     if (controller.isCanceled()) {
       // To finish if the call was cancelled before we set the notification (race condition)
       call.cancel(true);
       return call;
     }
 
-    calls.put(call.id, call);
-
-    // check again, see https://issues.apache.org/jira/browse/HBASE-12951
-    if (shouldCloseConnection) {
-      Promise<Message> promise = channel.eventLoop().newPromise();
-      promise.setFailure(new ConnectException());
-      return promise;
-    }
-
-    // Add timeout for cleanup if none is present
-    if (cleanupTimer == null) {
-      cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
-          TimeUnit.MILLISECONDS);
-    }
-
-    if(channel.isActive()) {
-      writeRequest(call);
+    synchronized (pendingCalls) {
+      if (closed) {
+        Promise<Message> promise = channel.eventLoop().newPromise();
+        promise.setFailure(new ConnectException());
+        return promise;
+      }
+      pendingCalls.put(call.id, call);
+      // Add timeout for cleanup if none is present
+      if (cleanupTimer == null && call.getRpcTimeout() > 0) {
+        cleanupTimer =
+            AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(),
+              TimeUnit.MILLISECONDS);
+      }
+      if (!connected) {
+        return call;
+      }
     }
-
+    writeRequest(call);
     return call;
   }
 
-  /**
-   * Calls method and returns a promise
-   * @param method to call
-   * @param controller to run call with
-   * @param request to send
-   * @param responsePrototype for response message
-   * @return Promise to listen to result
-   * @throws java.net.ConnectException on connection failures
-   */
-  public Promise<Message> callMethodWithPromise(
-      final Descriptors.MethodDescriptor method, final PayloadCarryingRpcController controller,
-      final Message request, final Message responsePrototype) throws ConnectException {
-    if (shouldCloseConnection || !channel.isOpen()) {
-      throw new ConnectException();
+  AsyncCall removePendingCall(int id) {
+    synchronized (pendingCalls) {
+      return pendingCalls.remove(id);
     }
-
-    return this.callMethod(method, controller, request, responsePrototype);
   }
 
   /**
@@ -400,10 +397,6 @@ public class AsyncRpcChannel {
    */
   private void writeRequest(final AsyncCall call) {
     try {
-      if (shouldCloseConnection) {
-        return;
-      }
-
       final RPCProtos.RequestHeader.Builder requestHeaderBuilder = RPCProtos.RequestHeader
           .newBuilder();
       requestHeaderBuilder.setCallId(call.id)
@@ -439,26 +432,13 @@ public class AsyncRpcChannel {
         IPCUtil.write(out, rh, call.param, cellBlock);
       }
 
-      channel.writeAndFlush(b).addListener(new CallWriteListener(this,call));
+      channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
     } catch (IOException e) {
-      if (!shouldCloseConnection) {
-        close(e);
-      }
+      close(e);
     }
   }
 
   /**
-   * Fail a call
-   *
-   * @param call  to fail
-   * @param cause of fail
-   */
-  void failCall(AsyncCall call, IOException cause) {
-    calls.remove(call.id);
-    call.setFailed(cause);
-  }
-
-  /**
    * Set up server authorization
    *
    * @throws java.io.IOException if auth setup failed
@@ -550,18 +530,22 @@ public class AsyncRpcChannel {
    * @param e exception on close
    */
   public void close(final Throwable e) {
-    client.removeConnection(ConnectionId.hashCode(ticket,serviceName,address));
+    client.removeConnection(this);
 
     // Move closing from the requesting thread to the channel thread
     channel.eventLoop().execute(new Runnable() {
       @Override
       public void run() {
-        if (shouldCloseConnection) {
-          return;
+        List<AsyncCall> toCleanup;
+        synchronized (pendingCalls) {
+          if (closed) {
+            return;
+          }
+          closed = true;
+          toCleanup = new ArrayList<AsyncCall>(pendingCalls.values());
+          pendingCalls.clear();
         }
-
-        shouldCloseConnection = true;
-
+        IOException closeException = null;
         if (e != null) {
           if (e instanceof IOException) {
             closeException = (IOException) e;
@@ -569,16 +553,19 @@ public class AsyncRpcChannel {
             closeException = new IOException(e);
           }
         }
-
         // log the info
         if (LOG.isDebugEnabled() && closeException != null) {
-          LOG.debug(name + ": closing ipc connection to " + address + ": " +
-              closeException.getMessage());
+          LOG.debug(name + ": closing ipc connection to " + address, closeException);
+        }
+        if (cleanupTimer != null) {
+          cleanupTimer.cancel();
+          cleanupTimer = null;
+        }
+        for (AsyncCall call : toCleanup) {
+          call.setFailed(closeException != null ? closeException : new ConnectionClosingException(
+              "Call id=" + call.id + " on server " + address + " aborted: connection is closing"));
         }
-
-        cleanupCalls(true);
         channel.disconnect().addListener(ChannelFutureListener.CLOSE);
-
         if (LOG.isDebugEnabled()) {
           LOG.debug(name + ": closed");
         }
@@ -591,64 +578,37 @@ public class AsyncRpcChannel {
    *
    * @param cleanAll true if all calls should be cleaned, false for only the timed out calls
    */
-  public void cleanupCalls(boolean cleanAll) {
-    // Cancel outstanding timers
-    if (cleanupTimer != null) {
-      cleanupTimer.cancel();
-      cleanupTimer = null;
-    }
-
-    if (cleanAll) {
-      for (AsyncCall call : calls.values()) {
-        synchronized (call) {
-          // Calls can be done on another thread so check before failing them
-          if(!call.isDone()) {
-            if (closeException == null) {
-              failCall(call, new ConnectionClosingException("Call id=" + call.id +
-                  " on server " + address + " aborted: connection is closing"));
-            } else {
-              failCall(call, closeException);
-            }
-          }
-        }
-      }
-    } else {
-      for (AsyncCall call : calls.values()) {
-        long waitTime = EnvironmentEdgeManager.currentTime() - call.getStartTime();
+  private void cleanupCalls() {
+    List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    long nextCleanupTaskDelay = -1L;
+    synchronized (pendingCalls) {
+      for (Iterator<AsyncCall> iter = pendingCalls.values().iterator(); iter.hasNext();) {
+        AsyncCall call = iter.next();
         long timeout = call.getRpcTimeout();
-        if (timeout > 0 && waitTime >= timeout) {
-          synchronized (call) {
-            // Calls can be done on another thread so check before failing them
-            if (!call.isDone()) {
-              closeException = new CallTimeoutException("Call id=" + call.id +
-                  ", waitTime=" + waitTime + ", rpcTimeout=" + timeout);
-              failCall(call, closeException);
+        if (timeout > 0) {
+          if (currentTime - call.getStartTime() >= timeout) {
+            iter.remove();
+            toCleanup.add(call);
+          } else {
+            if (nextCleanupTaskDelay < 0 || timeout < nextCleanupTaskDelay) {
+              nextCleanupTaskDelay = timeout;
             }
           }
-        } else {
-          // We expect the call to be ordered by timeout. It may not be the case, but stopping
-          //  at the first valid call allows to be sure that we still have something to do without
-          //  spending too much time by reading the full list.
-          break;
         }
       }
-
-      if (!calls.isEmpty()) {
-        AsyncCall firstCall = calls.firstEntry().getValue();
-
-        final long newTimeout;
-        long maxWaitTime = EnvironmentEdgeManager.currentTime() - firstCall.getStartTime();
-        if (maxWaitTime < firstCall.getRpcTimeout()) {
-          newTimeout = firstCall.getRpcTimeout() - maxWaitTime;
-        } else {
-          newTimeout = 0;
-        }
-
-        closeException = null;
-        cleanupTimer = AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask,
-            newTimeout, TimeUnit.MILLISECONDS);
+      if (nextCleanupTaskDelay > 0) {
+        cleanupTimer =
+            AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay,
+              TimeUnit.MILLISECONDS);
+      } else {
+        cleanupTimer = null;
       }
     }
+    for (AsyncCall call : toCleanup) {
+      call.setFailed(new CallTimeoutException("Call id=" + call.id + ", waitTime="
+          + (currentTime - call.getRpcTimeout()) + ", rpcTimeout=" + call.getRpcTimeout()));
+    }
   }
 
   /**
@@ -745,6 +705,10 @@ public class AsyncRpcChannel {
     });
   }
 
+  public int getConnectionHashCode() {
+    return ConnectionId.hashCode(ticket, serviceName, address);
+  }
+
   @Override
   public String toString() {
     return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
@@ -755,20 +719,22 @@ public class AsyncRpcChannel {
    */
   private static final class CallWriteListener implements ChannelFutureListener {
     private final AsyncRpcChannel rpcChannel;
-    private final AsyncCall call;
+    private final int id;
 
-    public CallWriteListener(AsyncRpcChannel asyncRpcChannel, AsyncCall call) {
+    public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) {
       this.rpcChannel = asyncRpcChannel;
-      this.call = call;
+      this.id = id;
     }
 
-    @Override public void operationComplete(ChannelFuture future) throws Exception {
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
       if (!future.isSuccess()) {
-        if(!this.call.isDone()) {
+        AsyncCall call = rpcChannel.removePendingCall(id);
+        if (call != null) {
           if (future.cause() instanceof IOException) {
-            rpcChannel.failCall(call, (IOException) future.cause());
+            call.setFailed((IOException) future.cause());
           } else {
-            rpcChannel.failCall(call, new IOException(future.cause()));
+            call.setFailed(new IOException(future.cause()));
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 30b622a..192e583 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -17,12 +17,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.RpcController;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
@@ -38,6 +32,16 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
@@ -49,13 +53,12 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
 import org.apache.hadoop.hbase.util.Threads;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.RpcController;
 
 /**
  * Netty client for the requests and responses
@@ -169,16 +172,16 @@ public class AsyncRpcClient extends AbstractRpcClient {
    * @throws InterruptedException if call is interrupted
    * @throws java.io.IOException  if a connection failure is encountered
    */
-  @Override protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
+  @Override
+  protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
       InetSocketAddress addr) throws IOException, InterruptedException {
-
     final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
 
-    Promise<Message> promise = connection.callMethodWithPromise(md, pcrc, param, returnType);
-
+    Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
+    long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
     try {
-      Message response = promise.get();
+      Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
       return new Pair<>(response, pcrc.cellScanner());
     } catch (ExecutionException e) {
       if (e.getCause() instanceof IOException) {
@@ -186,6 +189,8 @@ public class AsyncRpcClient extends AbstractRpcClient {
       } else {
         throw new IOException(e.getCause());
       }
+    } catch (TimeoutException e) {
+      throw new CallTimeoutException(promise.toString());
     }
   }
 
@@ -337,12 +342,20 @@ public class AsyncRpcClient extends AbstractRpcClient {
 
   /**
    * Remove connection from pool
-   *
-   * @param connectionHashCode of connection
    */
-  public void removeConnection(int connectionHashCode) {
+  public void removeConnection(AsyncRpcChannel connection) {
+    int connectionHashCode = connection.getConnectionHashCode();
     synchronized (connections) {
-      this.connections.remove(connectionHashCode);
+      // we use address as cache key, so we should check here to prevent removing the
+      // wrong connection
+      AsyncRpcChannel connectionInPool = this.connections.get(connectionHashCode);
+      if (connectionInPool == connection) {
+        this.connections.remove(connectionHashCode);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("%s already removed, expected instance %08x, actual %08x",
+          connection.toString(), System.identityHashCode(connection),
+          System.identityHashCode(connectionInPool)));
+      }
     }
   }
 
@@ -399,4 +412,4 @@ public class AsyncRpcClient extends AbstractRpcClient {
       this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e99091e9/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index d71bf5e..a900140 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.protobuf.Message;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
@@ -29,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.ipc.RemoteException;
 
-import java.io.IOException;
+import com.google.protobuf.Message;
 
 /**
  * Handles Hbase responses
@@ -52,16 +54,12 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
   @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     ByteBuf inBuffer = (ByteBuf) msg;
     ByteBufInputStream in = new ByteBufInputStream(inBuffer);
-
-    if (channel.shouldCloseConnection) {
-      return;
-    }
     int totalSize = inBuffer.readableBytes();
     try {
       // Read the header
       RPCProtos.ResponseHeader responseHeader = RPCProtos.ResponseHeader.parseDelimitedFrom(in);
       int id = responseHeader.getCallId();
-      AsyncCall call = channel.calls.get(id);
+      AsyncCall call = channel.removePendingCall(id);
       if (call == null) {
         // So we got a response for which we have no corresponding 'call' here on the client-side.
         // We probably timed out waiting, cleaned up all references, and now the server decides
@@ -85,7 +83,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
             equals(FatalConnectionException.class.getName())) {
           channel.close(re);
         } else {
-          channel.failCall(call, re);
+          call.setFailed(re);
         }
       } else {
         Message value = null;
@@ -104,13 +102,11 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
         }
         call.setSuccess(value, cellBlockScanner);
       }
-      channel.calls.remove(id);
     } catch (IOException e) {
       // Treat this as a fatal condition and close this connection
       channel.close(e);
     } finally {
       inBuffer.release();
-      channel.cleanupCalls(false);
     }
   }