You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/09/20 01:12:22 UTC

svn commit: r697275 - in /hadoop/core/trunk: CHANGES.txt src/core/org/apache/hadoop/ipc/Client.java src/test/findbugsExcludeFile.xml

Author: hairong
Date: Fri Sep 19 16:12:22 2008
New Revision: 697275

URL: http://svn.apache.org/viewvc?rev=697275&view=rev
Log:
HADOOP-4062. IPC client does not need to be synchronized on the output stream when a connection is closed. Contributed by Hairong Kuang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/test/findbugsExcludeFile.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 16:12:22 2008
@@ -396,6 +396,10 @@
     HADOOP-2165. Augmented JobHistory to include the URIs to the tasks'
     userlogs. (Vinod Kumar Vavilapalli via acmurthy) 
 
+    HADOOP-4062. Remove the synchronization on the output stream when a
+    connection is closed and also remove an undesirable exception when
+    a client is stoped while there is no pending RPC request. (hairong)
+ 
   OPTIMIZATIONS
 
     HADOOP-3556. Removed lock contention in MD5Hash by changing the 

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Fri Sep 19 16:12:22 2008
@@ -36,7 +36,6 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
@@ -178,8 +177,8 @@
     private ConnectionId remoteId;
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
-    private AtomicReference<DataOutputStream> out = 
-      new AtomicReference<DataOutputStream>();
+    private DataOutputStream out;
+    
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
@@ -311,8 +310,8 @@
         }
         this.in = new DataInputStream(new BufferedInputStream
             (new PingInputStream(NetUtils.getInputStream(socket))));
-        this.out.set(new DataOutputStream
-            (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
+        this.out = new DataOutputStream
+            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
         writeHeader();
 
         // update last activity time
@@ -370,7 +369,6 @@
      * Out is not synchronized because only the first thread does this.
      */
     private void writeHeader() throws IOException {
-      DataOutputStream out = this.out.get();
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
       //When there are more fields we can have ConnectionHeader Writable.
@@ -403,13 +401,13 @@
         return true;
       } else if (shouldCloseConnection.get()) {
         return false;
-      } else if (!running.get()) { //get stopped 
+      } else if (calls.isEmpty()) { // idle connection closed or stopped
+        markClosed(null);
+        return false;
+      } else { // get stopped but there are still pending requests 
         markClosed((IOException)new IOException().initCause(
             new InterruptedException()));
         return false;
-      } else { // closed because it has been idle for more than maxIdleTime
-        markClosed(null);
-        return false;
       }
     }
 
@@ -425,7 +423,6 @@
       if ( curTime - lastActivity.get() >= pingInterval) {
         lastActivity.set(curTime);
         synchronized (out) {
-          DataOutputStream out = this.out.get();
           out.writeInt(PING_CALL_ID);
           out.flush();
         }
@@ -460,9 +457,6 @@
       DataOutputBuffer d=null;
       try {
         synchronized (this.out) {
-          DataOutputStream out = this.out.get();
-          if (out==null) return;  // socket has closed
-          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
           
@@ -531,42 +525,37 @@
         return;
       }
 
-      synchronized (out) {
-        // release the resources
-        // first thing to do;take the connection out of the connection list
-        synchronized (connections) {
-          if (connections.get(remoteId) == this) {
-            connections.remove(remoteId);
-          }
+      // release the resources
+      // first thing to do;take the connection out of the connection list
+      synchronized (connections) {
+        if (connections.get(remoteId) == this) {
+          connections.remove(remoteId);
         }
+      }
 
-        // close the socket and streams
-        IOUtils.closeStream(in);
-        in = null;
-        IOUtils.closeStream(out.getAndSet(null));
-        IOUtils.closeSocket(socket);
-        socket = null;
-
-        // clean up all calls
-        if (closeException == null) {
-          if (!calls.isEmpty()) {
-            LOG.warn(
-            "A connection is closed for no cause and calls are not empty");
-
-            // clean up calls anyway
-            closeException = new IOException("Unexpected closed connection");
-            cleanupCalls();
-          }
-        } else {
-          // log the info
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("closing ipc connection to " + remoteId.address + ": " +
-                closeException.getMessage(),closeException);
-          }
+      // close the streams and therefore the socket
+      IOUtils.closeStream(out);
+      IOUtils.closeStream(in);
+
+      // clean up all calls
+      if (closeException == null) {
+        if (!calls.isEmpty()) {
+          LOG.warn(
+              "A connection is closed for no cause and calls are not empty");
 
-          // cleanup calls
+          // clean up calls anyway
+          closeException = new IOException("Unexpected closed connection");
           cleanupCalls();
         }
+      } else {
+        // log the info
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+              closeException.getMessage(),closeException);
+        }
+
+        // cleanup calls
+        cleanupCalls();
       }
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ": closed");

Modified: hadoop/core/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/findbugsExcludeFile.xml?rev=697275&r1=697274&r2=697275&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/core/trunk/src/test/findbugsExcludeFile.xml Fri Sep 19 16:12:22 2008
@@ -19,4 +19,13 @@
        <Field name="_jspx_dependants" />
        <Bug pattern="UWF_UNWRITTEN_FIELD" />
      </Match>
+     <!-- 
+       Inconsistent synchronization for Client.Connection.out is
+       is intentional to make a connection to be closed instantly. 
+     --> 
+     <Match>
+       <Class name="org.apache.hadoop.ipc.Client$Connection" />
+       <Field name="out" />
+       <Bug pattern="IS2_INCONSISTENT_SYNC" />
+     </Match>
 </FindBugsFilter>