You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/12/10 22:25:39 UTC

svn commit: r1419783 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/ipc/Client.java src/test/java/org/apache/hadoop/ipc/TestIPC.java src/test/java/org/apache/hadoop/ipc/TestRPC.java

Author: todd
Date: Mon Dec 10 21:25:38 2012
New Revision: 1419783

URL: http://svn.apache.org/viewvc?rev=1419783&view=rev
Log:
HADOOP-6762. Exception while doing RPC I/O closes channel. Contributed by Sam Rash and Todd Lipcon.

Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec 10 21:25:38 2012
@@ -175,6 +175,9 @@ Release 2.0.3-alpha - Unreleased 
 
     HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm)
 
+    HADOOP-6762. Exception while doing RPC I/O closes channel
+    (Sam Rash and todd via todd)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Dec 10 21:25:38 2012
@@ -38,6 +38,11 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -78,6 +83,8 @@ import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -102,6 +109,19 @@ public class Client {
   final static int PING_CALL_ID = -1;
   
   /**
+   * Executor on which IPC calls' parameters are sent. Deferring
+   * the sending of parameters to a separate thread isolates them
+   * from thread interruptions in the calling code.
+   */
+  private static final ExecutorService SEND_PARAMS_EXECUTOR = 
+    Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("IPC Parameter Sending Thread #%d")
+        .build());
+
+  
+  /**
    * set the ping interval value in configuration
    * 
    * @param conf Configuration
@@ -243,6 +263,8 @@ public class Client {
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
+    
+    private final Object sendParamsLock = new Object();
 
     public Connection(ConnectionId remoteId) throws IOException {
       this.remoteId = remoteId;
@@ -829,43 +851,76 @@ public class Client {
      * Note: this is not called from the Connection thread, but by other
      * threads.
      */
-    public void sendParam(Call call) {
+    public void sendParam(final Call call)
+        throws InterruptedException, IOException {
       if (shouldCloseConnection.get()) {
         return;
       }
 
-      DataOutputBuffer d=null;
-      try {
-        synchronized (this.out) {
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " sending #" + call.id);
+      // Serialize the call to be sent. This is done from the actual
+      // caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
+      // so that if the serialization throws an error, it is reported
+      // properly. This also parallelizes the serialization.
+      //
+      // Format of a call on the wire:
+      // 0) Length of rest below (1 + 2)
+      // 1) PayloadHeader  - is serialized Delimited hence contains length
+      // 2) the Payload - the RpcRequest
+      //
+      // Items '1' and '2' are prepared here. 
+      final DataOutputBuffer d = new DataOutputBuffer();
+      RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
+         call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
+      header.writeDelimitedTo(d);
+      call.rpcRequest.write(d);
+
+      synchronized (sendParamsLock) {
+        Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              synchronized (Connection.this.out) {
+                if (shouldCloseConnection.get()) {
+                  return;
+                }
+                
+                if (LOG.isDebugEnabled())
+                  LOG.debug(getName() + " sending #" + call.id);
+         
+                byte[] data = d.getData();
+                int totalLength = d.getLength();
+                out.writeInt(totalLength); // Total Length
+                out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
+                out.flush();
+              }
+            } catch (IOException e) {
+              // exception at this point would leave the connection in an
+              // unrecoverable state (eg half a call left on the wire).
+              // So, close the connection, killing any outstanding calls
+              markClosed(e);
+            } finally {
+              //the buffer is just an in-memory buffer, but it is still polite to
+              // close early
+              IOUtils.closeStream(d);
+            }
+          }
+        });
+      
+        try {
+          senderFuture.get();
+        } catch (ExecutionException e) {
+          Throwable cause = e.getCause();
           
-          // Serializing the data to be written.
-          // Format:
-          // 0) Length of rest below (1 + 2)
-          // 1) PayloadHeader  - is serialized Delimited hence contains length
-          // 2) the Payload - the RpcRequest
-          //
-          d = new DataOutputBuffer();
-          RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
-             call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
-          header.writeDelimitedTo(d);
-          call.rpcRequest.write(d);
-          byte[] data = d.getData();
-   
-          int totalLength = d.getLength();
-          out.writeInt(totalLength); // Total Length
-          out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
-          out.flush();
+          // cause should only be a RuntimeException as the Runnable above
+          // catches IOException
+          if (cause instanceof RuntimeException) {
+            throw (RuntimeException) cause;
+          } else {
+            throw new RuntimeException("unexpected checked exception", cause);
+          }
         }
-      } catch(IOException e) {
-        markClosed(e);
-      } finally {
-        //the buffer is just an in-memory buffer, but it is still polite to
-        // close early
-        IOUtils.closeStream(d);
       }
-    }  
+    }
 
     /* Receive a response.
      * Because only one receiver, so no synchronization on in.
@@ -1136,7 +1191,16 @@ public class Client {
       ConnectionId remoteId) throws InterruptedException, IOException {
     Call call = new Call(rpcKind, rpcRequest);
     Connection connection = getConnection(remoteId, call);
-    connection.sendParam(call);                 // send the parameter
+    try {
+      connection.sendParam(call);                 // send the parameter
+    } catch (RejectedExecutionException e) {
+      throw new IOException("connection has been closed", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("interrupted waiting to send params to server", e);
+      throw new IOException(e);
+    }
+
     boolean interrupted = false;
     synchronized (call) {
       while (!call.done) {

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Dec 10 21:25:38 2012
@@ -68,6 +68,7 @@ public class TestIPC {
    * of the various writables.
    **/
   static boolean WRITABLE_FAULTS_ENABLED = true;
+  static int WRITABLE_FAULTS_SLEEP = 0;
   
   static {
     Client.setPingInterval(conf, PING_INTERVAL);
@@ -206,16 +207,27 @@ public class TestIPC {
   
   static void maybeThrowIOE() throws IOException {
     if (WRITABLE_FAULTS_ENABLED) {
+      maybeSleep();
       throw new IOException("Injected fault");
     }
   }
 
   static void maybeThrowRTE() {
     if (WRITABLE_FAULTS_ENABLED) {
+      maybeSleep();
       throw new RuntimeException("Injected fault");
     }
   }
 
+  private static void maybeSleep() {
+    if (WRITABLE_FAULTS_SLEEP > 0) {
+      try {
+        Thread.sleep(WRITABLE_FAULTS_SLEEP);
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+
   @SuppressWarnings("unused")
   private static class IOEOnReadWritable extends LongWritable {
     public IOEOnReadWritable() {}
@@ -370,6 +382,27 @@ public class TestIPC {
         RTEOnReadWritable.class);
   }
   
+  /**
+   * Test case that fails a write, but only after taking enough time
+   * that a ping should have been sent. This is a reproducer for a
+   * deadlock seen in one iteration of HADOOP-6762.
+   */
+  @Test
+  public void testIOEOnWriteAfterPingClient() throws Exception {
+    // start server
+    Client.setPingInterval(conf, 100);
+
+    try {
+      WRITABLE_FAULTS_SLEEP = 1000;
+      doErrorTest(IOEOnWriteWritable.class,
+          LongWritable.class,
+          LongWritable.class,
+          LongWritable.class);
+    } finally {
+      WRITABLE_FAULTS_SLEEP = 0;
+    }
+  }
+  
   private static void assertExceptionContains(
       Throwable t, String substring) {
     String msg = StringUtils.stringifyException(t);

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Dec 10 21:25:38 2012
@@ -38,6 +38,10 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.SocketFactory;
 
@@ -823,6 +827,96 @@ public class TestRPC {
     }
   }
   
+  @Test(timeout=90000)
+  public void testRPCInterruptedSimple() throws Exception {
+    final Configuration conf = new Configuration();
+    Server server = RPC.getServer(
+      TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+    );
+    server.start();
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+    final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+        TestProtocol.class, TestProtocol.versionID, addr, conf);
+    // Connect to the server
+    proxy.ping();
+    // Interrupt self, try another call
+    Thread.currentThread().interrupt();
+    try {
+      proxy.ping();
+      fail("Interruption did not cause IPC to fail");
+    } catch (IOException ioe) {
+      if (!ioe.toString().contains("InterruptedException")) {
+        throw ioe;
+      }
+      // clear interrupt status for future tests
+      Thread.interrupted();
+    }
+  }
+  
+  @Test(timeout=30000)
+  public void testRPCInterrupted() throws IOException, InterruptedException {
+    final Configuration conf = new Configuration();
+    Server server = RPC.getServer(
+      TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+    );
+
+    server.start();
+
+    int numConcurrentRPC = 200;
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
+    final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
+    final AtomicBoolean leaderRunning = new AtomicBoolean(true);
+    final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+    Thread leaderThread = null;
+    
+    for (int i = 0; i < numConcurrentRPC; i++) {
+      final int num = i;
+      final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+      TestProtocol.class, TestProtocol.versionID, addr, conf);
+      Thread rpcThread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            barrier.await();
+            while (num == 0 || leaderRunning.get()) {
+              proxy.slowPing(false);
+            }
+
+            proxy.slowPing(false);
+          } catch (Exception e) {
+            if (num == 0) {
+              leaderRunning.set(false);
+            } else {
+              error.set(e);
+            }
+
+            LOG.error(e);
+          } finally {
+            latch.countDown();
+          }
+        }
+      });
+      rpcThread.start();
+
+      if (leaderThread == null) {
+       leaderThread = rpcThread;
+      }
+    }
+    // let threads get past the barrier
+    Thread.sleep(1000);
+    // stop a single thread
+    while (leaderRunning.get()) {
+      leaderThread.interrupt();
+    }
+    
+    latch.await();
+    
+    // should not cause any other thread to get an error
+    assertTrue("rpc got exception " + error.get(), error.get() == null);
+  }
+
   public static void main(String[] args) throws Exception {
     new TestRPC().testCallsInternal(conf);