You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/12/10 22:25:39 UTC
svn commit: r1419783 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common:
CHANGES.txt src/main/java/org/apache/hadoop/ipc/Client.java
src/test/java/org/apache/hadoop/ipc/TestIPC.java
src/test/java/org/apache/hadoop/ipc/TestRPC.java
Author: todd
Date: Mon Dec 10 21:25:38 2012
New Revision: 1419783
URL: http://svn.apache.org/viewvc?rev=1419783&view=rev
Log:
HADOOP-6762. Exception while doing RPC I/O closes channel. Contributed by Sam Rash and Todd Lipcon.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec 10 21:25:38 2012
@@ -175,6 +175,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9070. Kerberos SASL server cannot find kerberos key. (daryn via atm)
+ HADOOP-6762. Exception while doing RPC I/O closes channel
+ (Sam Rash and todd via todd)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Mon Dec 10 21:25:38 2012
@@ -38,6 +38,11 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -78,6 +83,8 @@ import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@@ -102,6 +109,19 @@ public class Client {
final static int PING_CALL_ID = -1;
/**
+ * Executor on which IPC calls' parameters are sent. Deferring
+ * the sending of parameters to a separate thread isolates them
+ * from thread interruptions in the calling code.
+ */
+ private static final ExecutorService SEND_PARAMS_EXECUTOR =
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("IPC Parameter Sending Thread #%d")
+ .build());
+
+
+ /**
* set the ping interval value in configuration
*
* @param conf Configuration
@@ -243,6 +263,8 @@ public class Client {
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
+
+ private final Object sendParamsLock = new Object();
public Connection(ConnectionId remoteId) throws IOException {
this.remoteId = remoteId;
@@ -829,43 +851,76 @@ public class Client {
* Note: this is not called from the Connection thread, but by other
* threads.
*/
- public void sendParam(Call call) {
+ public void sendParam(final Call call)
+ throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}
- DataOutputBuffer d=null;
- try {
- synchronized (this.out) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + " sending #" + call.id);
+ // Serialize the call to be sent. This is done from the actual
+ // caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
+ // so that if the serialization throws an error, it is reported
+ // properly. This also parallelizes the serialization.
+ //
+ // Format of a call on the wire:
+ // 0) Length of rest below (1 + 2)
+ // 1) PayloadHeader - is serialized Delimited hence contains length
+ // 2) the Payload - the RpcRequest
+ //
+ // Items '1' and '2' are prepared here.
+ final DataOutputBuffer d = new DataOutputBuffer();
+ RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
+ call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
+ header.writeDelimitedTo(d);
+ call.rpcRequest.write(d);
+
+ synchronized (sendParamsLock) {
+ Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ synchronized (Connection.this.out) {
+ if (shouldCloseConnection.get()) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + " sending #" + call.id);
+
+ byte[] data = d.getData();
+ int totalLength = d.getLength();
+ out.writeInt(totalLength); // Total Length
+ out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
+ out.flush();
+ }
+ } catch (IOException e) {
+ // exception at this point would leave the connection in an
+ // unrecoverable state (eg half a call left on the wire).
+ // So, close the connection, killing any outstanding calls
+ markClosed(e);
+ } finally {
+ //the buffer is just an in-memory buffer, but it is still polite to
+ // close early
+ IOUtils.closeStream(d);
+ }
+ }
+ });
+
+ try {
+ senderFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
- // Serializing the data to be written.
- // Format:
- // 0) Length of rest below (1 + 2)
- // 1) PayloadHeader - is serialized Delimited hence contains length
- // 2) the Payload - the RpcRequest
- //
- d = new DataOutputBuffer();
- RpcPayloadHeaderProto header = ProtoUtil.makeRpcPayloadHeader(
- call.rpcKind, RpcPayloadOperationProto.RPC_FINAL_PAYLOAD, call.id);
- header.writeDelimitedTo(d);
- call.rpcRequest.write(d);
- byte[] data = d.getData();
-
- int totalLength = d.getLength();
- out.writeInt(totalLength); // Total Length
- out.write(data, 0, totalLength);//PayloadHeader + RpcRequest
- out.flush();
+ // cause should only be a RuntimeException as the Runnable above
+ // catches IOException
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException("unexpected checked exception", cause);
+ }
}
- } catch(IOException e) {
- markClosed(e);
- } finally {
- //the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(d);
}
- }
+ }
/* Receive a response.
* Because only one receiver, so no synchronization on in.
@@ -1136,7 +1191,16 @@ public class Client {
ConnectionId remoteId) throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call);
- connection.sendParam(call); // send the parameter
+ try {
+ connection.sendParam(call); // send the parameter
+ } catch (RejectedExecutionException e) {
+ throw new IOException("connection has been closed", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("interrupted waiting to send params to server", e);
+ throw new IOException(e);
+ }
+
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Mon Dec 10 21:25:38 2012
@@ -68,6 +68,7 @@ public class TestIPC {
* of the various writables.
**/
static boolean WRITABLE_FAULTS_ENABLED = true;
+ static int WRITABLE_FAULTS_SLEEP = 0;
static {
Client.setPingInterval(conf, PING_INTERVAL);
@@ -206,16 +207,27 @@ public class TestIPC {
static void maybeThrowIOE() throws IOException {
if (WRITABLE_FAULTS_ENABLED) {
+ maybeSleep();
throw new IOException("Injected fault");
}
}
static void maybeThrowRTE() {
if (WRITABLE_FAULTS_ENABLED) {
+ maybeSleep();
throw new RuntimeException("Injected fault");
}
}
+ private static void maybeSleep() {
+ if (WRITABLE_FAULTS_SLEEP > 0) {
+ try {
+ Thread.sleep(WRITABLE_FAULTS_SLEEP);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
@SuppressWarnings("unused")
private static class IOEOnReadWritable extends LongWritable {
public IOEOnReadWritable() {}
@@ -370,6 +382,27 @@ public class TestIPC {
RTEOnReadWritable.class);
}
+ /**
+ * Test case that fails a write, but only after taking enough time
+ * that a ping should have been sent. This is a reproducer for a
+ * deadlock seen in one iteration of HADOOP-6762.
+ */
+ @Test
+ public void testIOEOnWriteAfterPingClient() throws Exception {
+ // start server
+ Client.setPingInterval(conf, 100);
+
+ try {
+ WRITABLE_FAULTS_SLEEP = 1000;
+ doErrorTest(IOEOnWriteWritable.class,
+ LongWritable.class,
+ LongWritable.class,
+ LongWritable.class);
+ } finally {
+ WRITABLE_FAULTS_SLEEP = 0;
+ }
+ }
+
private static void assertExceptionContains(
Throwable t, String substring) {
String msg = StringUtils.stringifyException(t);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java?rev=1419783&r1=1419782&r2=1419783&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java Mon Dec 10 21:25:38 2012
@@ -38,6 +38,10 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
@@ -823,6 +827,96 @@ public class TestRPC {
}
}
+ @Test(timeout=90000)
+ public void testRPCInterruptedSimple() throws Exception {
+ final Configuration conf = new Configuration();
+ Server server = RPC.getServer(
+ TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+ );
+ server.start();
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+
+ final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+ // Connect to the server
+ proxy.ping();
+ // Interrupt self, try another call
+ Thread.currentThread().interrupt();
+ try {
+ proxy.ping();
+ fail("Interruption did not cause IPC to fail");
+ } catch (IOException ioe) {
+ if (!ioe.toString().contains("InterruptedException")) {
+ throw ioe;
+ }
+ // clear interrupt status for future tests
+ Thread.interrupted();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testRPCInterrupted() throws IOException, InterruptedException {
+ final Configuration conf = new Configuration();
+ Server server = RPC.getServer(
+ TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null
+ );
+
+ server.start();
+
+ int numConcurrentRPC = 200;
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
+ final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
+ final AtomicBoolean leaderRunning = new AtomicBoolean(true);
+ final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+ Thread leaderThread = null;
+
+ for (int i = 0; i < numConcurrentRPC; i++) {
+ final int num = i;
+ final TestProtocol proxy = (TestProtocol) RPC.getProxy(
+ TestProtocol.class, TestProtocol.versionID, addr, conf);
+ Thread rpcThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ barrier.await();
+ while (num == 0 || leaderRunning.get()) {
+ proxy.slowPing(false);
+ }
+
+ proxy.slowPing(false);
+ } catch (Exception e) {
+ if (num == 0) {
+ leaderRunning.set(false);
+ } else {
+ error.set(e);
+ }
+
+ LOG.error(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ rpcThread.start();
+
+ if (leaderThread == null) {
+ leaderThread = rpcThread;
+ }
+ }
+ // let threads get past the barrier
+ Thread.sleep(1000);
+ // stop a single thread
+ while (leaderRunning.get()) {
+ leaderThread.interrupt();
+ }
+
+ latch.await();
+
+ // should not cause any other thread to get an error
+ assertTrue("rpc got exception " + error.get(), error.get() == null);
+ }
+
public static void main(String[] args) throws Exception {
new TestRPC().testCallsInternal(conf);