You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2015/09/18 23:40:47 UTC
hbase git commit: HBASE-14449 Rewrite deadlock prevention for
concurrent connection close
Repository: hbase
Updated Branches:
refs/heads/master 8cdf4a8e0 -> b0f523326
HBASE-14449 Rewrite deadlock prevention for concurrent connection close
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b0f52332
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b0f52332
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b0f52332
Branch: refs/heads/master
Commit: b0f52332651ecbb8af11557df5af3189c7283212
Parents: 8cdf4a8
Author: tedyu <yu...@gmail.com>
Authored: Fri Sep 18 14:40:39 2015 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Fri Sep 18 14:40:39 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 123 +++++++------------
1 file changed, 45 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b0f52332/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index f4dd701..b09674c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -88,10 +88,12 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -152,18 +154,6 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
- /*
- * This is the return value from {@link #waitForWork()} indicating whether run() method should:
- * read response
- * close the connection
- * take no action - connection would be closed by others
- */
- private enum WaitForWorkResult {
- READ_RESPONSE,
- CALLER_SHOULD_CLOSE,
- CLOSED
- }
-
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -255,13 +245,12 @@ public class RpcClientImpl extends AbstractRpcClient {
*/
@Override
public void run() {
- boolean closeBySelf = false;
while (!shouldCloseConnection.get()) {
CallFuture cts = null;
try {
cts = callsToWrite.take();
} catch (InterruptedException e) {
- closeBySelf = markClosed(new InterruptedIOException());
+ markClosed(new InterruptedIOException());
}
if (cts == null || cts == CallFuture.DEATH_PILL) {
@@ -285,14 +274,11 @@ public class RpcClientImpl extends AbstractRpcClient {
+ ", message =" + e.getMessage());
}
cts.call.setException(e);
- closeBySelf = markClosed(e);
+ markClosed(e);
}
}
cleanup();
- if (closeBySelf) {
- close();
- }
}
/**
@@ -526,28 +512,27 @@ public class RpcClientImpl extends AbstractRpcClient {
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
- * @return WaitForWorkResult indicating whether it is time to read response;
- * if the caller should close; or otherwise
+ * @return true if it is time to read a response; false otherwise.
*/
- protected synchronized WaitForWorkResult waitForWork() throws InterruptedException {
+ protected synchronized boolean waitForWork() throws InterruptedException {
// beware of the concurrent access to the calls list: we can add calls, but as well
// remove them.
long waitUntil = EnvironmentEdgeManager.currentTime() + minIdleTimeBeforeClose;
while (true) {
if (shouldCloseConnection.get()) {
- return WaitForWorkResult.CLOSED;
+ return false;
}
if (!running.get()) {
- if (markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"))) {
- return WaitForWorkResult.CALLER_SHOULD_CLOSE;
- }
- return WaitForWorkResult.CLOSED;
+ markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
+ return false;
}
if (!calls.isEmpty()) {
- return WaitForWorkResult.READ_RESPONSE;
+ // shouldCloseConnection can be set to true by a parallel thread here. The caller
+ // will need to check anyway.
+ return true;
}
if (EnvironmentEdgeManager.currentTime() >= waitUntil) {
@@ -555,11 +540,9 @@ public class RpcClientImpl extends AbstractRpcClient {
// We expect the number of calls to be zero here, but actually someone can
// adds a call at the any moment, as there is no synchronization between this task
// and adding new calls. It's not a big issue, but it will get an exception.
- if (markClosed(new IOException(
- "idle connection closed with " + calls.size() + " pending request(s)"))) {
- return WaitForWorkResult.CALLER_SHOULD_CLOSE;
- }
- return WaitForWorkResult.CLOSED;
+ markClosed(new IOException(
+ "idle connection closed with " + calls.size() + " pending request(s)"));
+ return false;
}
wait(Math.min(minIdleTimeBeforeClose, 1000));
@@ -576,38 +559,24 @@ public class RpcClientImpl extends AbstractRpcClient {
LOG.trace(getName() + ": starting, connections " + connections.size());
}
- WaitForWorkResult result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
try {
- result = waitForWork(); // Wait here for work - read or close connection
- while (result == WaitForWorkResult.READ_RESPONSE) {
- if (readResponse()) {
- // shouldCloseConnection is set to true by readResponse(). Close the connection
- result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
- } else {
- result = waitForWork();
- }
+ while (waitForWork()) { // Wait here for work - read or close connection
+ readResponse();
}
} catch (InterruptedException t) {
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": interrupted while waiting for call responses");
}
- if (markClosed(ExceptionUtil.asInterrupt(t))) {
- // shouldCloseConnection is set to true. Close connection
- result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
- }
+ markClosed(ExceptionUtil.asInterrupt(t));
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": unexpected throwable while waiting for call responses", t);
}
- if (markClosed(new IOException("Unexpected throwable while waiting call responses", t))) {
- // shouldCloseConnection is set to true. Close connection
- result = WaitForWorkResult.CALLER_SHOULD_CLOSE;
- }
- }
- if (result == WaitForWorkResult.CALLER_SHOULD_CLOSE) {
- close();
+ markClosed(new IOException("Unexpected throwable while waiting call responses", t));
}
+ close();
+
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": stopped, connections " + connections.size());
}
@@ -735,9 +704,8 @@ public class RpcClientImpl extends AbstractRpcClient {
}
IOException e = new FailedServerException(
"This server is in the failed servers list: " + server);
- if (markClosed(e)) {
- close();
- }
+ markClosed(e);
+ close();
throw e;
}
@@ -815,9 +783,8 @@ public class RpcClientImpl extends AbstractRpcClient {
e = new IOException("Could not set up IO Streams to " + server, t);
}
}
- if (markClosed(e)) {
- close();
- }
+ markClosed(e);
+ close();
throw e;
}
}
@@ -948,9 +915,8 @@ public class RpcClientImpl extends AbstractRpcClient {
} catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line
// won't even try to write
- if (markClosed(e)) {
- close();
- }
+ markClosed(e);
+ close();
writeException = e;
interrupt();
}
@@ -968,10 +934,9 @@ public class RpcClientImpl extends AbstractRpcClient {
/* Receive a response.
* Because only one receiver, so no synchronization on in.
- * @return true if connection should be closed by caller
*/
- protected boolean readResponse() {
- if (shouldCloseConnection.get()) return false;
+ protected void readResponse() {
+ if (shouldCloseConnection.get()) return;
Call call = null;
boolean expectedCall = false;
try {
@@ -993,14 +958,14 @@ public class RpcClientImpl extends AbstractRpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead);
- return false;
+ return;
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse);
call.setException(re);
if (isFatalConnectionException(exceptionResponse)) {
- return markClosed(re);
+ markClosed(re);
}
} else {
Message value = null;
@@ -1027,12 +992,11 @@ public class RpcClientImpl extends AbstractRpcClient {
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
} else {
// Treat this as a fatal condition and close this connection
- return markClosed(e);
+ markClosed(e);
}
} finally {
cleanupCalls(false);
}
- return false;
}
/**
@@ -1058,22 +1022,18 @@ public class RpcClientImpl extends AbstractRpcClient {
e.getStackTrace(), doNotRetry);
}
- /*
- * @return true if shouldCloseConnection is set true by this thread; false otherwise
- */
- protected boolean markClosed(IOException e) {
+ protected synchronized void markClosed(IOException e) {
if (e == null) throw new NullPointerException();
- boolean ret = shouldCloseConnection.compareAndSet(false, true);
- if (ret) {
+ if (shouldCloseConnection.compareAndSet(false, true)) {
if (LOG.isTraceEnabled()) {
LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());
}
if (callSender != null) {
callSender.close();
}
+ notifyAll();
}
- return ret;
}
@@ -1161,6 +1121,7 @@ public class RpcClientImpl extends AbstractRpcClient {
if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
if (!running.compareAndSet(true, false)) return;
+ Set<Connection> connsToClose = null;
// wake up all connections
synchronized (connections) {
for (Connection conn : connections.values()) {
@@ -1172,13 +1133,19 @@ public class RpcClientImpl extends AbstractRpcClient {
// In case the CallSender did not setupIOStreams() yet, the Connection may not be started
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
if (!conn.isAlive()) {
- if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {
- conn.close();
+ if (connsToClose == null) {
+ connsToClose = new HashSet<Connection>();
}
+ connsToClose.add(conn);
}
}
}
-
+ if (connsToClose != null) {
+ for (Connection conn : connsToClose) {
+ conn.markClosed(new InterruptedIOException("RpcClient is closing"));
+ conn.close();
+ }
+ }
// wait until all connections are closed
while (!connections.isEmpty()) {
try {