You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/08/19 17:49:42 UTC

hbase git commit: HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close

Repository: hbase
Updated Branches:
  refs/heads/master 1bb9e3ae9 -> 16f8d2770


HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16f8d277
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16f8d277
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16f8d277

Branch: refs/heads/master
Commit: 16f8d277088987dc8d4bb1614d05ce712bfc245d
Parents: 1bb9e3a
Author: tedyu <yu...@gmail.com>
Authored: Wed Aug 19 08:49:38 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Aug 19 08:49:38 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 108 +++++++++++++------
 1 file changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/16f8d277/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index fd3bab0..5ece8ae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -152,6 +152,18 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
   }
 
+  /*
+   * This is the return value from {@link #waitForWork()} indicating whether run() method should:
+   * read response
+   * close the connection
+   * take no action - connection would be closed by others
+   */
+  private enum WaitForWorkResult {
+    READ_RESPONSE,
+    CALLER_SHOULD_CLOSE,
+    CLOSED
+  }
+
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -243,12 +255,13 @@ public class RpcClientImpl extends AbstractRpcClient {
        */
       @Override
       public void run() {
+        boolean closeBySelf = false;
         while (!shouldCloseConnection.get()) {
           CallFuture cts = null;
           try {
             cts = callsToWrite.take();
           } catch (InterruptedException e) {
-            markClosed(new InterruptedIOException());
+            closeBySelf = markClosed(new InterruptedIOException());
           }
 
           if (cts == null || cts == CallFuture.DEATH_PILL) {
@@ -272,11 +285,14 @@ public class RpcClientImpl extends AbstractRpcClient {
                 + ", message =" + e.getMessage());
             }
             cts.call.setException(e);
-            markClosed(e);
+            closeBySelf = markClosed(e);
           }
         }
 
         cleanup();
+        if (closeBySelf) {
+          close();
+        }
       }
 
       /**
@@ -510,27 +526,28 @@ public class RpcClientImpl extends AbstractRpcClient {
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
      *
-     * @return true if it is time to read a response; false otherwise.
+     * @return WaitForWorkResult indicating whether it is time to read response;
+     * if the caller should close; or otherwise
      */
-    protected synchronized boolean waitForWork() throws InterruptedException {
+    protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
       // beware of the concurrent access to the calls list: we can add calls, but as well
       //  remove them.
       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
 
       while (true) {
         if (shouldCloseConnection.get()) {
-          return false;
+          return WaitForWorkResult.CLOSED;
         }
 
         if (!running.get()) {
-          markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
-          return false;
+          if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
+            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          }
+          return WaitForWorkResult.CLOSED;
         }
 
         if (!calls.isEmpty()) {
-          // shouldCloseConnection can be set to true by a parallel thread here. The caller
-          //  will need to check anyway.
-          return true;
+          return WaitForWorkResult.READ_RESPONSE;
         }
 
         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@@ -538,9 +555,11 @@ public class RpcClientImpl extends AbstractRpcClient {
           // We expect the number of calls to be zero here, but actually someone can
           //  adds a call at the any moment, as there is no synchronization between this task
           //  and adding new calls. It's not a big issue, but it will get an exception.
-          markClosed(new IOException(
-              "idle connection closed with " + calls.size() + " pending request(s)"));
-          return false;
+          if (markClosed(new IOException(
+              "idle connection closed with " + calls.size() + " pending request(s)"))) {
+            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          }
+          return WaitForWorkResult.CLOSED;
         }
 
         wait(Math.min(minIdleTimeBeforeClose, 1000));
@@ -557,23 +576,37 @@ public class RpcClientImpl extends AbstractRpcClient {
         LOG.trace(getName() + ": starting, connections " + connections.size());
       }
 
+      WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
       try {
-        while (waitForWork()) { // Wait here for work - read or close connection
-          readResponse();
+        result = waitForWork(); // Wait here for work - read or close connection
+        while (result == WaitForWorkResult.READ_RESPONSE) {
+          if (readResponse()) {
+            // shouldCloseConnection is set to true by readResponse(). Close the connection
+            result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+          } else {
+            result = waitForWork();
+          }
         }
       } catch (InterruptedException t) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": interrupted while waiting for call responses");
         }
-        markClosed(ExceptionUtil.asInterrupt(t));
+        if (markClosed(ExceptionUtil.asInterrupt(t))) {
+          // shouldCloseConnection is set to true. Close connection
+          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+        }
       } catch (Throwable t) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
         }
-        markClosed(new IOException("Unexpected throwable while waiting call responses", t));
+        if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
+          // shouldCloseConnection is set to true. Close connection
+          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+        }
+      }
+      if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
+        close();
       }
-
-      close();
 
       if (LOG.isTraceEnabled()) {
         LOG.trace(getName() + ": stopped, connections " + connections.size());
@@ -702,8 +735,9 @@ public class RpcClientImpl extends AbstractRpcClient {
         }
         IOException e = new FailedServerException(
             "This server is in the failed servers list: " + server);
-        markClosed(e);
-        close();
+        if (markClosed(e)) {
+          close();
+        }
         throw e;
       }
 
@@ -781,8 +815,9 @@ public class RpcClientImpl extends AbstractRpcClient {
             e = new IOException("Could not set up IO Streams to " + server, t);
           }
         }
-        markClosed(e);
-        close();
+        if (markClosed(e)) {
+          close();
+        }
         throw e;
       }
     }
@@ -922,9 +957,10 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
+     * @return true if connection should be closed by caller
      */
-    protected void readResponse() {
-      if (shouldCloseConnection.get()) return;
+    protected boolean readResponse() {
+      if (shouldCloseConnection.get()) return false;
       Call call = null;
       boolean expectedCall = false;
       try {
@@ -946,14 +982,14 @@ public class RpcClientImpl extends AbstractRpcClient {
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           IOUtils.skipFully(in, whatIsLeftToRead);
-          return;
+          return false;
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
           call.setException(re);
           if (isFatalConnectionException(exceptionResponse)) {
-            markClosed(re);
+            return markClosed(re);
           }
         } else {
           Message value = null;
@@ -980,11 +1016,12 @@ public class RpcClientImpl extends AbstractRpcClient {
           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
         } else {
           // Treat this as a fatal condition and close this connection
-          markClosed(e);
+          return markClosed(e);
         }
       } finally {
         cleanupCalls(false);
       }
+      return false;
     }
 
     /**
@@ -1010,18 +1047,22 @@ public class RpcClientImpl extends AbstractRpcClient {
           e.getStackTrace(), doNotRetry);
     }
 
-    protected synchronized void markClosed(IOException e) {
+    /*
+     * @return true if shouldCloseConnection is set true by this thread; false otherwise
+     */
+    protected boolean markClosed(IOException e) {
       if (e == null) throw new NullPointerException();
 
-      if (shouldCloseConnection.compareAndSet(false, true)) {
+      boolean ret = shouldCloseConnection.compareAndSet(false, true);
+      if (ret) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
         }
         if (callSender != null) {
           callSender.close();
         }
-        notifyAll();
       }
+      return ret;
     }
 
 
@@ -1120,8 +1161,9 @@ public class RpcClientImpl extends AbstractRpcClient {
         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
         if (!conn.isAlive()) {
-          conn.markClosed(new InterruptedIOException("RpcClient is closing"));
-          conn.close();
+          if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
+            conn.close();
+          }
         }
       }
     }