You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/07 21:47:37 UTC
svn commit: r1575395 - in /hbase/branches/hbase-10070: ./
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
Author: nkeywal
Date: Fri Mar 7 20:47:37 2014
New Revision: 1575395
URL: http://svn.apache.org/r1575395
Log:
HBASE-10637 rpcClient: Setup the iostreams when writing
Modified:
hbase/branches/hbase-10070/ (props changed)
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
Propchange: hbase/branches/hbase-10070/
------------------------------------------------------------------------------
Merged /hbase/trunk:r1574110
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1575395&r1=1575394&r2=1575395&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Fri Mar 7 20:47:37 2014
@@ -857,10 +857,15 @@ public class RpcClient {
}
protected synchronized void setupIOstreams() throws IOException {
- if (socket != null || shouldCloseConnection.get()) {
+ if (socket != null) {
+ // The connection is already available. Perfect.
return;
}
+ if (shouldCloseConnection.get()){
+ throw new IOException("This connection is closing");
+ }
+
if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + server +
@@ -908,6 +913,7 @@ public class RpcClient {
}
});
} catch (Exception ex) {
+ ExceptionUtil.rethrowIfInterrupt(ex);
if (rand == null) {
rand = new Random();
}
@@ -934,12 +940,14 @@ public class RpcClient {
return;
}
} catch (Throwable t) {
- failedServers.addToFailedServers(remoteId.address);
- IOException e;
- if (t instanceof IOException) {
- e = (IOException)t;
- } else {
- e = new IOException("Could not set up IO Streams", t);
+ IOException e = ExceptionUtil.asInterrupt(t);
+ if (e == null) {
+ failedServers.addToFailedServers(remoteId.address);
+ if (t instanceof IOException) {
+ e = (IOException) t;
+ } else {
+ e = new IOException("Could not set up IO Streams to " + server, t);
+ }
}
markClosed(e);
close();
@@ -1048,6 +1056,8 @@ public class RpcClient {
if (priority != 0) builder.setPriority(priority);
RequestHeader header = builder.build();
+ setupIOstreams();
+
// Now we're going to write the call. We take the lock, then check that the connection
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
@@ -1557,15 +1567,6 @@ public class RpcClient {
}
}
- //we don't invoke the method below inside "synchronized (connections)"
- //block above. The reason for that is if the server happens to be slow,
- //it will take longer to establish a connection and that will slow the
- //entire system down.
- //Moreover, if the connection is currently created, there will be many threads
- // waiting here; as setupIOstreams is synchronized. If the connection fails with a
- // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
- connection.setupIOstreams();
-
return connection;
}
Modified: hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java?rev=1575395&r1=1575394&r2=1575395&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java (original)
+++ hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java Fri Mar 7 20:47:37 2014
@@ -28,7 +28,7 @@ import java.nio.channels.ClosedByInterru
* - InterruptedException
* - InterruptedIOException (inherits IOException); used in IO
* - ClosedByInterruptException (inherits IOException)
- * , - SocketTimeoutException inherits InterruptedIOException but is not a real
+ * - SocketTimeoutException inherits InterruptedIOException but is not a real
* interruption, so we have to distinguish the case. This pattern is unfortunately common.
*/
public class ExceptionUtil {
@@ -39,7 +39,7 @@ public class ExceptionUtil {
public static boolean isInterrupt(Throwable t) {
if (t instanceof InterruptedException) return true;
if (t instanceof SocketTimeoutException) return false;
- return (t instanceof InterruptedIOException);
+ return (t instanceof InterruptedIOException || t instanceof ClosedByInterruptException);
}
/**
@@ -58,7 +58,7 @@ public class ExceptionUtil {
if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
- if (t instanceof InterruptedException) {
+ if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(t);
return iie;