You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/08/19 17:49:42 UTC
hbase git commit: HBASE-14241 Fix deadlock during cluster shutdown
due to concurrent connection close
Repository: hbase
Updated Branches:
refs/heads/master 1bb9e3ae9 -> 16f8d2770
HBASE-14241 Fix deadlock during cluster shutdown due to concurrent connection close
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16f8d277
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16f8d277
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16f8d277
Branch: refs/heads/master
Commit: 16f8d277088987dc8d4bb1614d05ce712bfc245d
Parents: 1bb9e3a
Author: tedyu <yu...@gmail.com>
Authored: Wed Aug 19 08:49:38 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Wed Aug 19 08:49:38 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 108 +++++++++++++------
1 file changed, 75 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/16f8d277/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index fd3bab0..5ece8ae 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -152,6 +152,18 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
+ /*
+ * This is the return value from {@link #waitForWork()} indicating whether run() method should:
+ * read response
+ * close the connection
+ * take no action - connection would be closed by others
+ */
+ private enum WaitForWorkResult {
+ READ_RESPONSE,
+ CALLER_SHOULD_CLOSE,
+ CLOSED
+ }
+
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -243,12 +255,13 @@ public class RpcClientImpl extends AbstractRpcClient {
*/
@Override
public void run() {
+ boolean closeBySelf = false;
while (!shouldCloseConnection.get()) {
CallFuture cts = null;
try {
cts = callsToWrite.take();
} catch (InterruptedException e) {
- markClosed(new InterruptedIOException());
+ closeBySelf = markClosed(new InterruptedIOException());
}
if (cts == null || cts == CallFuture.DEATH_PILL) {
@@ -272,11 +285,14 @@ public class RpcClientImpl extends AbstractRpcClient {
+ ", message =" + e.getMessage());
}
cts.call.setException(e);
- markClosed(e);
+ closeBySelf = markClosed(e);
}
}
cleanup();
+ if (closeBySelf) {
+ close();
+ }
}
/**
@@ -510,27 +526,28 @@ public class RpcClientImpl extends AbstractRpcClient {
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
- * @return true if it is time to read a response; false otherwise.
+ * @return WaitForWorkResult indicating whether it is time to read response;
+ * if the caller should close; or otherwise
*/
- protected synchronized boolean waitForWork() throws InterruptedException {
+ protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
// beware of the concurrent access to the calls list: we can add calls, but as well
// remove them.
long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
while (true) {
if (shouldCloseConnection.get()) {
- return false;
+ return WaitForWorkResult.CLOSED;
}
if (!running.get()) {
- markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
- return false;
+ if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
+ return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+ }
+ return WaitForWorkResult.CLOSED;
}
if (!calls.isEmpty()) {
- // shouldCloseConnection can be set to true by a parallel thread here. The caller
- // will need to check anyway.
- return true;
+ return WaitForWorkResult.READ_RESPONSE;
}
if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@@ -538,9 +555,11 @@ public class RpcClientImpl extends AbstractRpcClient {
// We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception.
- markClosed(new IOException(
- "idle connection closed with " + calls.size() + " pending request(s)"));
- return false;
+ if (markClosed(new IOException(
+ "idle connection closed with " + calls.size() + " pending request(s)"))) {
+ return WaitForWorkResult.CALLER_SHOULD_CLOSE;
+ }
+ return WaitForWorkResult.CLOSED;
}
wait(Math.min(minIdleTimeBeforeClose, 1000));
@@ -557,23 +576,37 @@ public class RpcClientImpl extends AbstractRpcClient {
LOG.trace(getName() + ": starting, connections " + connections.size());
}
+ WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
try {
- while (waitForWork()) { // Wait here for work - read or close connection
- readResponse();
+ result = waitForWork(); // Wait here for work - read or close connection
+ while (result == WaitForWorkResult.READ_RESPONSE) {
+ if (readResponse()) {
+ // shouldCloseConnection is set to true by readResponse(). Close the connection
+ result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+ } else {
+ result = waitForWork();
+ }
}
} catch (InterruptedException t) {
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": interrupted while waiting for call responses");
}
- markClosed(ExceptionUtil.asInterrupt(t));
+ if (markClosed(ExceptionUtil.asInterrupt(t))) {
+ // shouldCloseConnection is set to true. Close connection
+ result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+ }
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
}
- markClosed(new IOException("Unexpected throwable while waiting call responses", t));
+ if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
+ // shouldCloseConnection is set to true. Close connection
+ result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
+ }
+ }
+ if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
+ close();
}
-
- close();
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": stopped, connections " + connections.size());
@@ -702,8 +735,9 @@ public class RpcClientImpl extends AbstractRpcClient {
}
IOException e = new FailedServerException(
"This server is in the failed servers list: " + server);
- markClosed(e);
- close();
+ if (markClosed(e)) {
+ close();
+ }
throw e;
}
@@ -781,8 +815,9 @@ public class RpcClientImpl extends AbstractRpcClient {
e = new IOException("Could not set up IO Streams to " + server, t);
}
}
- markClosed(e);
- close();
+ if (markClosed(e)) {
+ close();
+ }
throw e;
}
}
@@ -922,9 +957,10 @@ public class RpcClientImpl extends AbstractRpcClient {
/* Receive a response.
* Because only one receiver, so no synchronization on in.
+ * @return true if connection should be closed by caller
*/
- protected void readResponse() {
- if (shouldCloseConnection.get()) return;
+ protected boolean readResponse() {
+ if (shouldCloseConnection.get()) return false;
Call call = null;
boolean expectedCall = false;
try {
@@ -946,14 +982,14 @@ public class RpcClientImpl extends AbstractRpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead);
- return;
+ return false;
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse);
call.setException(re);
if (isFatalConnectionException(exceptionResponse)) {
- markClosed(re);
+ return markClosed(re);
}
} else {
Message value = null;
@@ -980,11 +1016,12 @@ public class RpcClientImpl extends AbstractRpcClient {
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
} else {
// Treat this as a fatal condition and close this connection
- markClosed(e);
+ return markClosed(e);
}
} finally {
cleanupCalls(false);
}
+ return false;
}
/**
@@ -1010,18 +1047,22 @@ public class RpcClientImpl extends AbstractRpcClient {
e.getStackTrace(), doNotRetry);
}
- protected synchronized void markClosed(IOException e) {
+ /*
+ * @return true if shouldCloseConnection is set true by this thread; false otherwise
+ */
+ protected boolean markClosed(IOException e) {
if (e == null) throw new NullPointerException();
- if (shouldCloseConnection.compareAndSet(false, true)) {
+ boolean ret = shouldCloseConnection.compareAndSet(false, true);
+ if (ret) {
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
}
if (callSender != null) {
callSender.close();
}
- notifyAll();
}
+ return ret;
}
@@ -1120,8 +1161,9 @@ public class RpcClientImpl extends AbstractRpcClient {
// In case the CallSender did not setupIOStreams() yet, the Connection may not be started
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
if (!conn.isAlive()) {
- conn.markClosed(new InterruptedIOException("RpcClient is closing"));
- conn.close();
+ if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
+ conn.close();
+ }
}
}
}