You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/04 15:33:58 UTC

svn commit: r1574110 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java

Author: nkeywal
Date: Tue Mar  4 14:33:57 2014
New Revision: 1574110

URL: http://svn.apache.org/r1574110
Log:
HBASE-10637 rpcClient: Setup the iostreams when writing

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1574110&r1=1574109&r2=1574110&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Tue Mar  4 14:33:57 2014
@@ -877,10 +877,15 @@ public class RpcClient {
     }
 
     protected synchronized void setupIOstreams() throws IOException {
-      if (socket != null || shouldCloseConnection.get()) {
+      if (socket != null) {
+        // The connection is already available. Perfect.
         return;
       }
 
+      if (shouldCloseConnection.get()){
+        throw new IOException("This connection is closing");
+      }
+
       if (failedServers.isFailedServer(remoteId.getAddress())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Not trying to connect to " + server +
@@ -926,6 +931,7 @@ public class RpcClient {
                 }
               });
             } catch (Exception ex) {
+              ExceptionUtil.rethrowIfInterrupt(ex);
               if (rand == null) {
                 rand = new Random();
               }
@@ -952,12 +958,14 @@ public class RpcClient {
           return;
         }
       } catch (Throwable t) {
-        failedServers.addToFailedServers(remoteId.address);
-        IOException e;
-        if (t instanceof IOException) {
-          e = (IOException)t;
-        } else {
-          e = new IOException("Could not set up IO Streams", t);
+        IOException e = ExceptionUtil.asInterrupt(t);
+        if (e == null) {
+          failedServers.addToFailedServers(remoteId.address);
+          if (t instanceof IOException) {
+            e = (IOException) t;
+          } else {
+            e = new IOException("Could not set up IO Streams to " + server, t);
+          }
         }
         markClosed(e);
         close();
@@ -1064,6 +1072,8 @@ public class RpcClient {
       if (priority != 0) builder.setPriority(priority);
       RequestHeader header = builder.build();
 
+      setupIOstreams();
+
       // Now we're going to write the call. We take the lock, then check that the connection
       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
       //  know where we stand, we have to close the connection.
@@ -1445,8 +1455,7 @@ public class RpcClient {
       Message returnType, User ticket, InetSocketAddress addr, int callTimeout, int priority)
       throws IOException, InterruptedException {
     Call call = new Call(md, param, cells, returnType, callTimeout);
-    Connection connection =
-      getConnection(ticket, call, addr, this.codec, this.compressor);
+    Connection connection = getConnection(ticket, call, addr, this.codec, this.compressor);
 
     CallFuture cts = null;
     if (connection.callSender != null){
@@ -1553,15 +1562,6 @@ public class RpcClient {
       }
     }
 
-    //we don't invoke the method below inside "synchronized (connections)"
-    //block above. The reason for that is if the server happens to be slow,
-    //it will take longer to establish a connection and that will slow the
-    //entire system down.
-    //Moreover, if the connection is currently created, there will be many threads
-    // waiting here; as setupIOstreams is synchronized. If the connection fails with a
-    // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
-    connection.setupIOstreams();
-
     return connection;
   }
 

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java?rev=1574110&r1=1574109&r2=1574110&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ExceptionUtil.java Tue Mar  4 14:33:57 2014
@@ -28,7 +28,7 @@ import java.nio.channels.ClosedByInterru
  * - InterruptedException
  * - InterruptedIOException (inherits IOException); used in IO
  * - ClosedByInterruptException (inherits IOException)
- * , - SocketTimeoutException inherits InterruptedIOException but is not a real
+ * - SocketTimeoutException inherits InterruptedIOException but is not a real
  * interruption, so we have to distinguish the case. This pattern is unfortunately common.
  */
 public class ExceptionUtil {
@@ -39,7 +39,7 @@ public class ExceptionUtil {
   public static boolean isInterrupt(Throwable t) {
     if (t instanceof InterruptedException) return true;
     if (t instanceof SocketTimeoutException) return false;
-    return (t instanceof InterruptedIOException);
+    return (t instanceof InterruptedIOException || t instanceof ClosedByInterruptException);
   }
 
   /**
@@ -58,7 +58,7 @@ public class ExceptionUtil {
 
     if (t instanceof InterruptedIOException) return (InterruptedIOException) t;
 
-    if (t instanceof InterruptedException) {
+    if (t instanceof InterruptedException || t instanceof ClosedByInterruptException) {
       InterruptedIOException iie = new InterruptedIOException();
       iie.initCause(t);
       return iie;