You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/09/18 23:40:47 UTC

hbase git commit: HBASE-14449 Rewrite deadlock prevention for concurrent connection close

Repository: hbase
Updated Branches:
  refs/heads/master 8cdf4a8e0 -> b0f523326


HBASE-14449 Rewrite deadlock prevention for concurrent connection close


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0f52332
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0f52332
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0f52332

Branch: refs/heads/master
Commit: b0f52332651ecbb8af11557df5af3189c7283212
Parents: 8cdf4a8
Author: tedyu <yu...@gmail.com>
Authored: Fri Sep 18 14:40:39 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Sep 18 14:40:39 2015 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  | 123 +++++++------------
 1 file changed, 45 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b0f52332/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index f4dd701..b09674c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -88,10 +88,12 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -152,18 +154,6 @@ public class RpcClientImpl extends AbstractRpcClient {
     }
   }
 
-  /*
-   * This is the return value from {@link #waitForWork()} indicating whether run() method should:
-   * read response
-   * close the connection
-   * take no action - connection would be closed by others
-   */
-  private enum WaitForWorkResult {
-    READ_RESPONSE,
-    CALLER_SHOULD_CLOSE,
-    CLOSED
-  }
-
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -255,13 +245,12 @@ public class RpcClientImpl extends AbstractRpcClient {
        */
       @Override
       public void run() {
-        boolean closeBySelf = false;
         while (!shouldCloseConnection.get()) {
           CallFuture cts = null;
           try {
             cts = callsToWrite.take();
           } catch (InterruptedException e) {
-            closeBySelf = markClosed(new InterruptedIOException());
+            markClosed(new InterruptedIOException());
           }
 
           if (cts == null || cts == CallFuture.DEATH_PILL) {
@@ -285,14 +274,11 @@ public class RpcClientImpl extends AbstractRpcClient {
                 + ", message =" + e.getMessage());
             }
             cts.call.setException(e);
-            closeBySelf = markClosed(e);
+            markClosed(e);
           }
         }
 
         cleanup();
-        if (closeBySelf) {
-          close();
-        }
       }
 
       /**
@@ -526,28 +512,27 @@ public class RpcClientImpl extends AbstractRpcClient {
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
      *
-     * @return WaitForWorkResult indicating whether it is time to read response;
-     * if the caller should close; or otherwise
+     * @return true if it is time to read a response; false otherwise.
      */
-    protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
+    protected synchronized boolean waitForWork() throws InterruptedException {
       // beware of the concurrent access to the calls list: we can add calls, but as well
       //  remove them.
       long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
 
       while (true) {
         if (shouldCloseConnection.get()) {
-          return WaitForWorkResult.CLOSED;
+          return false;
         }
 
         if (!running.get()) {
-          if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
-            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
-          }
-          return WaitForWorkResult.CLOSED;
+          markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
+          return false;
         }
 
         if (!calls.isEmpty()) {
-          return WaitForWorkResult.READ_RESPONSE;
+          // shouldCloseConnection can be set to true by a parallel thread here. The caller
+          //  will need to check anyway.
+          return true;
         }
 
         if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@@ -555,11 +540,9 @@ public class RpcClientImpl extends AbstractRpcClient {
           // We expect the number of calls to be zero here, but actually someone can
           //  adds a call at the any moment, as there is no synchronization between this task
           //  and adding new calls. It's not a big issue, but it will get an exception.
-          if (markClosed(new IOException(
-              "idle connection closed with " + calls.size() + " pending request(s)"))) {
-            return WaitForWorkResult.CALLER_SHOULD_CLOSE;
-          }
-          return WaitForWorkResult.CLOSED;
+          markClosed(new IOException(
+              "idle connection closed with " + calls.size() + " pending request(s)"));
+          return false;
         }
 
         wait(Math.min(minIdleTimeBeforeClose, 1000));
@@ -576,38 +559,24 @@ public class RpcClientImpl extends AbstractRpcClient {
         LOG.trace(getName() + ": starting, connections " + connections.size());
       }
 
-      WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
       try {
-        result = waitForWork(); // Wait here for work - read or close connection
-        while (result == WaitForWorkResult.READ_RESPONSE) {
-          if (readResponse()) {
-            // shouldCloseConnection is set to true by readResponse(). Close the connection
-            result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
-          } else {
-            result = waitForWork();
-          }
+        while (waitForWork()) { // Wait here for work - read or close connection
+          readResponse();
         }
       } catch (InterruptedException t) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": interrupted while waiting for call responses");
         }
-        if (markClosed(ExceptionUtil.asInterrupt(t))) {
-          // shouldCloseConnection is set to true. Close connection
-          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
-        }
+        markClosed(ExceptionUtil.asInterrupt(t));
       } catch (Throwable t) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
         }
-        if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
-          // shouldCloseConnection is set to true. Close connection
-          result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
-        }
-      }
-      if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
-        close();
+        markClosed(new IOException("Unexpected throwable while waiting call responses", t));
       }
 
+      close();
+
       if (LOG.isTraceEnabled()) {
         LOG.trace(getName() + ": stopped, connections " + connections.size());
       }
@@ -735,9 +704,8 @@ public class RpcClientImpl extends AbstractRpcClient {
         }
         IOException e = new FailedServerException(
             "This server is in the failed servers list: " + server);
-        if (markClosed(e)) {
-          close();
-        }
+        markClosed(e);
+        close();
         throw e;
       }
 
@@ -815,9 +783,8 @@ public class RpcClientImpl extends AbstractRpcClient {
             e = new IOException("Could not set up IO Streams to " + server, t);
           }
         }
-        if (markClosed(e)) {
-          close();
-        }
+        markClosed(e);
+        close();
         throw e;
       }
     }
@@ -948,9 +915,8 @@ public class RpcClientImpl extends AbstractRpcClient {
         } catch (IOException e) {
           // We set the value inside the synchronized block, this way the next in line
           //  won't even try to write
-          if (markClosed(e)) {
-            close();
-          }
+          markClosed(e);
+          close();
           writeException = e;
           interrupt();
         }
@@ -968,10 +934,9 @@ public class RpcClientImpl extends AbstractRpcClient {
 
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
-     * @return true if connection should be closed by caller
      */
-    protected boolean readResponse() {
-      if (shouldCloseConnection.get()) return false;
+    protected void readResponse() {
+      if (shouldCloseConnection.get()) return;
       Call call = null;
       boolean expectedCall = false;
       try {
@@ -993,14 +958,14 @@ public class RpcClientImpl extends AbstractRpcClient {
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           IOUtils.skipFully(in, whatIsLeftToRead);
-          return false;
+          return;
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
           call.setException(re);
           if (isFatalConnectionException(exceptionResponse)) {
-            return markClosed(re);
+            markClosed(re);
           }
         } else {
           Message value = null;
@@ -1027,12 +992,11 @@ public class RpcClientImpl extends AbstractRpcClient {
           if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
         } else {
           // Treat this as a fatal condition and close this connection
-          return markClosed(e);
+          markClosed(e);
         }
       } finally {
         cleanupCalls(false);
       }
-      return false;
     }
 
     /**
@@ -1058,22 +1022,18 @@ public class RpcClientImpl extends AbstractRpcClient {
           e.getStackTrace(), doNotRetry);
     }
 
-    /*
-     * @return true if shouldCloseConnection is set true by this thread; false otherwise
-     */
-    protected boolean markClosed(IOException e) {
+    protected synchronized void markClosed(IOException e) {
       if (e == null) throw new NullPointerException();
 
-      boolean ret = shouldCloseConnection.compareAndSet(false, true);
-      if (ret) {
+      if (shouldCloseConnection.compareAndSet(false, true)) {
         if (LOG.isTraceEnabled()) {
           LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
         }
         if (callSender != null) {
           callSender.close();
         }
+        notifyAll();
       }
-      return ret;
     }
 
 
@@ -1161,6 +1121,7 @@ public class RpcClientImpl extends AbstractRpcClient {
     if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
     if (!running.compareAndSet(true, false)) return;
 
+    Set<Connection> connsToClose = null;
     // wake up all connections
     synchronized (connections) {
       for (Connection conn : connections.values()) {
@@ -1172,13 +1133,19 @@ public class RpcClientImpl extends AbstractRpcClient {
         // In case the CallSender did not setupIOStreams() yet, the Connection may not be started
         // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
         if (!conn.isAlive()) {
-          if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
-            conn.close();
+          if (connsToClose == null) {
+            connsToClose = new HashSet<Connection>();
           }
+          connsToClose.add(conn);
         }
       }
     }
-
+    if (connsToClose != null) {
+      for (Connection conn : connsToClose) {
+        conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+        conn.close();
+      }
+    }
     // wait until all connections are closed
     while (!connections.isEmpty()) {
       try {