You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2008/06/06 07:38:54 UTC

svn commit: r663828 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/ipc/Client.java src/test/org/apache/hadoop/ipc/TestIPC.java

Author: hairong
Date: Thu Jun  5 22:38:54 2008
New Revision: 663828

URL: http://svn.apache.org/viewvc?rev=663828&view=rev
Log:
HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and improve its synchronization. Contributed by Steve Loughran and Hairong Kuang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun  5 22:38:54 2008
@@ -473,6 +473,9 @@
     HADOOP-3493. Fix TestStreamingFailure to use FileUtil.fullyDelete to
     ensure correct cleanup. (Lohit Vijayarenu via acmurthy) 
 
+    HADOOP-3455. Fix NPE in ipc.Client in case of connection failure and
+    improve its synchronization. (hairong)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/ipc/Client.java Thu Jun  5 22:38:54 2008
@@ -34,6 +34,9 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
@@ -48,7 +51,6 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -65,7 +67,7 @@
 
   private Class<?> valueClass;                       // class of call values
   private int counter;                            // counter for call ids
-  private boolean running = true;                 // true while client runs
+  private AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final private Configuration conf;
   final private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
@@ -125,7 +127,7 @@
   synchronized boolean isZeroReference() {
     return refCount==0;
   }
-  
+
   /** A call waiting for a value. */
   private class Call {
     int id;                                       // call id
@@ -175,12 +177,13 @@
   private class Connection extends Thread {
     private ConnectionId remoteId;
     private Socket socket = null;                 // connected socket
-    private DataInputStream in;                   
-    private DataOutputStream out;
+    private DataInputStream in;
+    private AtomicReference<DataOutputStream> out = 
+      new AtomicReference<DataOutputStream>();
     // currently active calls
     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
-    private long lastActivity = 0;     // last I/O activity time
-    private boolean shouldCloseConnection = false;  // indicate if the connection is closed
+    private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
+    private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
 
     public Connection(InetSocketAddress address) throws IOException {
@@ -201,23 +204,25 @@
     }
 
     /** Update lastActivity with the current time. */
-    private synchronized void touch() {
-      touch(System.currentTimeMillis());
-    }
-    
-    private synchronized void touch(long curTime) {
-      lastActivity = curTime;
+    private void touch() {
+      lastActivity.set(System.currentTimeMillis());
     }
 
-    /** Add a call to this connection's call queue */
+    /**
+     * Add a call to this connection's call queue and notify
+     * a listener; synchronized.
+     * Returns false if called during shutdown.
+     * @param call to add
+     * @return true if the call was added.
+     */
     private synchronized boolean addCall(Call call) {
-      if (shouldCloseConnection)
+      if (shouldCloseConnection.get())
         return false;
       calls.put(call.id, call);
       notify();
       return true;
     }
-    
+
     /** This class sends a ping to the remote side when timeout on
      * reading. If no failure is detected, it retries until at least
      * a byte is read.
@@ -233,7 +238,7 @@
        * otherwise, throw the timeout exception.
        */
       private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection || !running) {
+        if (shouldCloseConnection.get() || !running.get()) {
           throw e;
         } else {
           sendPing();
@@ -243,6 +248,7 @@
       /** Read a byte from the stream.
        * Send a ping if timeout on read. Retries if no failure is detected
        * until a byte is read.
+       * @throws IOException for any IO problem other than socket timeout
        */
       public int read() throws IOException {
         do {
@@ -258,7 +264,7 @@
        * Send a ping if timeout on read. Retries if no failure is detected
        * until a byte is read.
        * 
-       * @Return the total number of bytes read; -1 if the connection is closed.
+       * @return the total number of bytes read; -1 if the connection is closed.
        */
       public int read(byte[] buf, int off, int len) throws IOException {
         do {
@@ -276,7 +282,7 @@
      * the connection thread that waits for responses.
      */
     private synchronized void setupIOstreams() {
-      if (socket != null || shouldCloseConnection) {
+      if (socket != null || shouldCloseConnection.get()) {
         return;
       }
       
@@ -305,19 +311,19 @@
         }
         this.in = new DataInputStream(new BufferedInputStream
             (new PingInputStream(NetUtils.getInputStream(socket))));
-        this.out = new DataOutputStream
-            (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
+        this.out.set(new DataOutputStream
+            (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
         writeHeader();
 
         // update last activity time
         touch();
 
+        // start the receiver thread after the socket connection has been set up
+        start();
       } catch (IOException e) {
         markClosed(e);
         close();
       }
-      // start the receiver thread after the socket connection has been set up
-      start();
     }
 
     /* Handle connection failures
@@ -325,7 +331,10 @@
      * If the current number of retries is equal to the max number of retries,
      * stop retrying and throw the exception; Otherwise backoff 1 second and
      * try connecting again.
-     * 
+     *
+     * This Method is only called from inside setupIOstreams(), which is
+     * synchronized. Hence the sleep is synchronized; the locks will be retained.
+     *
      * @param curRetries current number of retries
      * @param maxRetries max number of retries allowed
      * @param ioe failure reason
@@ -361,6 +370,7 @@
      * Out is not synchronized because only the first thread does this.
      */
     private void writeHeader() throws IOException {
+      DataOutputStream out = this.out.get();
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
       //When there are more fields we can have ConnectionHeader Writable.
@@ -379,8 +389,9 @@
      * Return true if it is time to read a response; false otherwise.
      */
     private synchronized boolean waitForWork() {
-      if (calls.isEmpty() && !shouldCloseConnection  && running)  {
-        long timeout = maxIdleTime-(System.currentTimeMillis()-lastActivity);
+      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
+        long timeout = maxIdleTime-
+              (System.currentTimeMillis()-lastActivity.get());
         if (timeout>0) {
           try {
             wait(timeout);
@@ -388,11 +399,11 @@
         }
       }
       
-      if (!calls.isEmpty() && !shouldCloseConnection && running) {
+      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
         return true;
-      } else if (shouldCloseConnection) {
+      } else if (shouldCloseConnection.get()) {
         return false;
-      } else if (!running) { //get stopped 
+      } else if (!running.get()) { //get stopped 
         markClosed((IOException)new IOException().initCause(
             new InterruptedException()));
         return false;
@@ -411,9 +422,10 @@
      */
     private synchronized void sendPing() throws IOException {
       long curTime = System.currentTimeMillis();
-      if ( curTime - lastActivity >= pingInterval) {
-        touch(curTime);
+      if ( curTime - lastActivity.get() >= pingInterval) {
+        lastActivity.set(curTime);
         synchronized (out) {
+          DataOutputStream out = this.out.get();
           out.writeInt(PING_CALL_ID);
           out.flush();
         }
@@ -441,30 +453,36 @@
      * threads.
      */
     public void sendParam(Call call) {
-      synchronized (this) {
-        if (shouldCloseConnection) {
-          return;
-        }
+      if (shouldCloseConnection.get()) {
+        return;
       }
 
+      DataOutputBuffer d=null;
       try {
-        synchronized (out) {
+        synchronized (this.out) {
+          DataOutputStream out = this.out.get();
+          if (out==null) return;  // socket has closed
+          
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
-
-          DataOutputBuffer d = new DataOutputBuffer(); //for serializing the
+          
+          //for serializing the
           //data to be written
+          d = new DataOutputBuffer();
           d.writeInt(call.id);
           call.param.write(d);
           byte[] data = d.getData();
           int dataLength = d.getLength();
-
           out.writeInt(dataLength);      //first put the data length
           out.write(data, 0, dataLength);//write the data
           out.flush();
         }
       } catch(IOException e) {
         markClosed(e);
+      } finally {
+        //the buffer is just an in-memory buffer, but it is still polite to
+        // close early
+        IOUtils.closeStream(d);
       }
     }  
 
@@ -472,10 +490,8 @@
      * Because only one receiver, so no synchronization on in.
      */
     private void receiveResponse() {
-      synchronized (this) {
-        if (shouldCloseConnection) {
-          return;
-        }
+      if (shouldCloseConnection.get()) {
+        return;
       }
       touch();
       
@@ -502,8 +518,7 @@
     }
     
     private synchronized void markClosed(IOException e) {
-      if (!shouldCloseConnection) {
-        shouldCloseConnection = true;
+      if (shouldCloseConnection.compareAndSet(false, true)) {
         closeException = e;
         notifyAll();
       }
@@ -511,7 +526,7 @@
     
     /** Close the connection. */
     private synchronized void close() {
-      if (!shouldCloseConnection) {
+      if (!shouldCloseConnection.get()) {
         LOG.error("The connection is not in the closed state");
         return;
       }
@@ -527,15 +542,17 @@
 
         // close the socket and streams
         IOUtils.closeStream(in);
-        IOUtils.closeStream(out);
+        in = null;
+        IOUtils.closeStream(out.getAndSet(null));
         IOUtils.closeSocket(socket);
+        socket = null;
 
         // clean up all calls
         if (closeException == null) {
           if (!calls.isEmpty()) {
             LOG.warn(
             "A connection is closed for no cause and calls are not empty");
-            
+
             // clean up calls anyway
             closeException = new IOException("Unexpected closed connection");
             cleanupCalls();
@@ -544,7 +561,7 @@
           // log the info
           if (LOG.isDebugEnabled()) {
             LOG.debug("closing ipc connection to " + remoteId.address + ": " +
-                StringUtils.stringifyException(closeException));
+                closeException.getMessage(),closeException);
           }
 
           // cleanup calls
@@ -643,11 +660,10 @@
     if (LOG.isDebugEnabled()) {
       LOG.debug("Stopping client");
     }
-    
-    if (running == false) {
+
+    if (!running.compareAndSet(true, false)) {
       return;
     }
-    running = false;
     
     // wake up all connections
     synchronized (connections) {
@@ -716,8 +732,9 @@
           Connection connection = getConnection(addresses[i], null, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
+          // log errors
           LOG.info("Calling "+addresses[i]+" caught: " + 
-                   StringUtils.stringifyException(e)); // log errors
+                   e.getMessage(),e);
           results.size--;                         //  wait for one fewer result
         }
       }
@@ -737,11 +754,9 @@
                                    UserGroupInformation ticket,
                                    Call call)
                                    throws IOException {
-    synchronized (this) {
-      if (!running) {
-        // the client is stopped
-        throw new IOException("The client is stopped");
-      }
+    if (!running.get()) {
+      // the client is stopped
+      throw new IOException("The client is stopped");
     }
     Connection connection;
     /* we could avoid this allocation for each RPC by having a  

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java?rev=663828&r1=663827&r2=663828&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java Thu Jun  5 22:38:54 2008
@@ -208,6 +208,19 @@
     }
   }
 	
+  public void testStandAloneClient() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+    Client client = new Client(LongWritable.class, conf);
+    boolean hasException = false;
+    try {
+      client.call(new LongWritable(RANDOM.nextLong()), 
+          new InetSocketAddress("127.0.0.1", 10));
+    } catch (IOException e) {
+      hasException = true;
+    }
+    assertTrue (hasException);
+  }
+
   public static void main(String[] args) throws Exception {
 
     //new TestIPC("test").testSerial(5, false, 2, 10, 1000);