You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/25 17:48:29 UTC

svn commit: r1581414 - /hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java

Author: nkeywal
Date: Tue Mar 25 16:48:29 2014
New Revision: 1581414

URL: http://svn.apache.org/r1581414
Log:
HBASE-10814 RpcClient: some calls can get stuck when connection is closing

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1581414&r1=1581413&r2=1581414&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Tue Mar 25 16:48:29 2014
@@ -270,7 +270,7 @@ public class RpcClient {
      * Check if the call did timeout. Set an exception (includes a notify) if it's the case.
      * @return true if the call is on timeout, false otherwise.
      */
-    public boolean checkTimeout() {
+    public boolean checkAndSetTimeout() {
       if (timeout == 0){
         return false;
       }
@@ -358,9 +358,9 @@ public class RpcClient {
    * see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender}
    */
   private static class CallFuture {
-    Call call;
-    int priority;
-    Span span;
+    final Call call;
+    final int priority;
+    final Span span;
 
     // We will use this to stop the writer
     final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
@@ -472,7 +472,7 @@ public class RpcClient {
             continue;
           }
 
-          if (cts.call.checkTimeout()) {
+          if (cts.call.checkAndSetTimeout()) {
             continue;
           }
 
@@ -1118,11 +1118,12 @@ public class RpcClient {
      */
     protected void readResponse() {
       if (shouldCloseConnection.get()) return;
-      int totalSize;
+      Call call = null;
+      boolean expectedCall = false;
       try {
         // See HBaseServer.Call.setResponse for where we write out the response.
         // Total size of the response.  Unused.  But have to read it in anyways.
-        totalSize = in.readInt();
+        int totalSize = in.readInt();
 
         // Read the header
         ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);
@@ -1131,8 +1132,8 @@ public class RpcClient {
           LOG.debug(getName() + ": got response header " +
             TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
         }
-        Call call = calls.remove(id);
-        boolean expectedCall = (call != null && !call.done);
+        call = calls.remove(id); // call.done have to be set before leaving this method
+        expectedCall = (call != null && !call.done);
         if (!expectedCall) {
           // So we got a response for which we have no corresponding 'call' here on the client-side.
           // We probably timed out waiting, cleaned up all references, and now the server decides
@@ -1148,14 +1149,13 @@ public class RpcClient {
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
+          if (expectedCall) call.setException(re);
           if (isFatalConnectionException(exceptionResponse)) {
             markClosed(re);
-          } else {
-            if (expectedCall) call.setException(re);
           }
         } else {
           Message value = null;
-          // Call may be null because it may have timedout and been cleaned up on this side already
+          // Call may be null because it may have timeout and been cleaned up on this side already
           if (expectedCall && call.responseDefaultType != null) {
             Builder builder = call.responseDefaultType.newBuilderForType();
             builder.mergeDelimitedFrom(in);
@@ -1173,6 +1173,7 @@ public class RpcClient {
           if (expectedCall) call.setResponse(value, cellBlockScanner);
         }
       } catch (IOException e) {
+        if (expectedCall) call.setException(e);
         if (e instanceof SocketTimeoutException) {
           // Clean up open calls but don't treat this as a fatal condition,
           // since we expect certain responses to not make it by the specified
@@ -1183,6 +1184,11 @@ public class RpcClient {
         }
       } finally {
         cleanupCalls(false);
+        if (expectedCall && !call.done) {
+          LOG.warn("Coding error: code should be true for callId=" + call.id +
+              ", server=" + getRemoteAddress() +
+              ", shouldCloseConnection=" + shouldCloseConnection.get());
+        }
       }
     }
 
@@ -1227,7 +1233,7 @@ public class RpcClient {
 
     /**
      * Cleanup the calls older than a given timeout, in milli seconds.
-     * @param allCalls for all calls,
+     * @param allCalls true for all calls, false for only the calls in timeout
      */
     protected synchronized void cleanupCalls(boolean allCalls) {
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
@@ -1238,10 +1244,11 @@ public class RpcClient {
           itor.remove();
         } else if (allCalls) {
           long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
-          IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
+          IOException ie = new IOException("Connection to " + getRemoteAddress()
+              + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
           c.setException(ie);
           itor.remove();
-        } else if (c.checkTimeout()) {
+        } else if (c.checkAndSetTimeout()) {
           itor.remove();
         } else {
           // We expect the call to be ordered by timeout. It may not be the case, but stopping
@@ -1468,10 +1475,14 @@ public class RpcClient {
     }
 
     while (!call.done) {
-      if (call.checkTimeout()) {
+      if (call.checkAndSetTimeout()) {
         if (cts != null) connection.callSender.remove(cts);
         break;
       }
+      if (connection.shouldCloseConnection.get()) {
+        throw new IOException("Call id=" + call.id + " on server "
+            + addr + " aborted: connection is closing");
+      }
       try {
         synchronized (call) {
           call.wait(Math.min(call.remainingTime(), 1000) + 1);