You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/11/18 16:52:21 UTC
[hadoop] branch branch-3.3.5 updated: HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
new bc4d7b47747 HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)
bc4d7b47747 is described below
commit bc4d7b47747f58d8b0973c18a148be74df3e6c9a
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Fri Nov 18 16:24:45 2022 +0000
HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)
* Exactly 1 sending thread per an RPC connection.
* If the calling thread is interrupted before the socket write, it will be skipped instead of sending it anyways.
* If the calling thread is interrupted during the socket write, the write will finish.
* RPC requests will be written to the socket in the order received.
* Sending thread is only started by the receiving thread.
* The sending thread periodically checks the shouldCloseConnection flag.
---
.../main/java/org/apache/hadoop/ipc/Client.java | 186 +++++++-------------
.../test/java/org/apache/hadoop/ipc/TestIPC.java | 5 -
.../test/java/org/apache/hadoop/ipc/TestRPC.java | 195 +++++++++++++++++++++
3 files changed, 255 insertions(+), 131 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2e51c63389b..16f799bddcf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.ipc;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
@@ -165,73 +165,6 @@ public class Client implements AutoCloseable {
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
- /**
- * Executor on which IPC calls' parameters are sent.
- * Deferring the sending of parameters to a separate
- * thread isolates them from thread interruptions in the
- * calling code.
- */
- private final ExecutorService sendParamsExecutor;
- private final static ClientExecutorServiceFactory clientExcecutorFactory =
- new ClientExecutorServiceFactory();
-
- private static class ClientExecutorServiceFactory {
- private int executorRefCount = 0;
- private ExecutorService clientExecutor = null;
-
- /**
- * Get Executor on which IPC calls' parameters are sent.
- * If the internal reference counter is zero, this method
- * creates the instance of Executor. If not, this method
- * just returns the reference of clientExecutor.
- *
- * @return An ExecutorService instance
- */
- synchronized ExecutorService refAndGetInstance() {
- if (executorRefCount == 0) {
- clientExecutor = Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("IPC Parameter Sending Thread #%d")
- .build());
- }
- executorRefCount++;
-
- return clientExecutor;
- }
-
- /**
- * Cleanup Executor on which IPC calls' parameters are sent.
- * If reference counter is zero, this method discards the
- * instance of the Executor. If not, this method
- * just decrements the internal reference counter.
- *
- * @return An ExecutorService instance if it exists.
- * Null is returned if not.
- */
- synchronized ExecutorService unrefAndCleanup() {
- executorRefCount--;
- assert(executorRefCount >= 0);
-
- if (executorRefCount == 0) {
- clientExecutor.shutdown();
- try {
- if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
- clientExecutor.shutdownNow();
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for clientExecutor" +
- " to stop");
- clientExecutor.shutdownNow();
- Thread.currentThread().interrupt();
- }
- clientExecutor = null;
- }
-
- return clientExecutor;
- }
- }
-
/**
* set the ping interval value in configuration
*
@@ -300,11 +233,6 @@ public class Client implements AutoCloseable {
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
}
- @VisibleForTesting
- public static final ExecutorService getClientExecutor() {
- return Client.clientExcecutorFactory.clientExecutor;
- }
-
/**
* Increment this client's reference count
*/
@@ -461,8 +389,10 @@ public class Client implements AutoCloseable {
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
-
- private final Object sendRpcRequestLock = new Object();
+
+ private final Thread rpcRequestThread;
+ private final SynchronousQueue<Pair<Call, ResponseBuffer>> rpcRequestQueue =
+ new SynchronousQueue<>(true);
private AtomicReference<Thread> connectingThread = new AtomicReference<>();
private final Consumer<Connection> removeMethod;
@@ -471,6 +401,9 @@ public class Client implements AutoCloseable {
Consumer<Connection> removeMethod) {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
+ this.rpcRequestThread = new Thread(new RpcRequestSender(),
+ "IPC Parameter Sending Thread for " + remoteId);
+ this.rpcRequestThread.setDaemon(true);
this.maxResponseLength = remoteId.conf.getInt(
CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
@@ -1149,6 +1082,10 @@ public class Client implements AutoCloseable {
@Override
public void run() {
+ // Don't start the ipc parameter sending thread until we start this
+ // thread, because the shutdown logic only gets triggered if this
+ // thread is started.
+ rpcRequestThread.start();
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": starting, having connections "
+ connections.size());
@@ -1172,9 +1109,52 @@ public class Client implements AutoCloseable {
+ connections.size());
}
+ /**
+ * A thread to write rpc requests to the socket.
+ */
+ private class RpcRequestSender implements Runnable {
+ @Override
+ public void run() {
+ while (!shouldCloseConnection.get()) {
+ ResponseBuffer buf = null;
+ try {
+ Pair<Call, ResponseBuffer> pair =
+ rpcRequestQueue.poll(maxIdleTime, TimeUnit.MILLISECONDS);
+ if (pair == null || shouldCloseConnection.get()) {
+ continue;
+ }
+ buf = pair.getRight();
+ synchronized (ipcStreams.out) {
+ if (LOG.isDebugEnabled()) {
+ Call call = pair.getLeft();
+ LOG.debug(getName() + "{} sending #{} {}", getName(), call.id,
+ call.rpcRequest);
+ }
+ // RpcRequestHeader + RpcRequest
+ ipcStreams.sendRequest(buf.toByteArray());
+ ipcStreams.flush();
+ }
+ } catch (InterruptedException ie) {
+ // stop this thread
+ return;
+ } catch (IOException e) {
+ // exception at this point would leave the connection in an
+ // unrecoverable state (eg half a call left on the wire).
+ // So, close the connection, killing any outstanding calls
+ markClosed(e);
+ } finally {
+ //the buffer is just an in-memory buffer, but it is still polite to
+ // close early
+ IOUtils.closeStream(buf);
+ }
+ }
+ }
+ }
+
/** Initiates a rpc call by sending the rpc request to the remote server.
- * Note: this is not called from the Connection thread, but by other
- * threads.
+ * Note: this is not called from the current thread, but by another
+ * thread, so that if the current thread is interrupted that the socket
+ * state isn't corrupted with a partially written message.
* @param call - the rpc request
*/
public void sendRpcRequest(final Call call)
@@ -1184,8 +1164,7 @@ public class Client implements AutoCloseable {
}
// Serialize the call to be sent. This is done from the actual
- // caller thread, rather than the sendParamsExecutor thread,
-
+ // caller thread, rather than the rpcRequestThread in the connection,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
@@ -1202,51 +1181,7 @@ public class Client implements AutoCloseable {
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-
- synchronized (sendRpcRequestLock) {
- Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- synchronized (ipcStreams.out) {
- if (shouldCloseConnection.get()) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + " sending #" + call.id
- + " " + call.rpcRequest);
- }
- // RpcRequestHeader + RpcRequest
- ipcStreams.sendRequest(buf.toByteArray());
- ipcStreams.flush();
- }
- } catch (IOException e) {
- // exception at this point would leave the connection in an
- // unrecoverable state (eg half a call left on the wire).
- // So, close the connection, killing any outstanding calls
- markClosed(e);
- } finally {
- //the buffer is just an in-memory buffer, but it is still polite to
- // close early
- IOUtils.closeStream(buf);
- }
- }
- });
-
- try {
- senderFuture.get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
-
- // cause should only be a RuntimeException as the Runnable above
- // catches IOException
- if (cause instanceof RuntimeException) {
- throw (RuntimeException) cause;
- } else {
- throw new RuntimeException("unexpected checked exception", cause);
- }
- }
- }
+ rpcRequestQueue.put(Pair.of(call, buf));
}
/* Receive a response.
@@ -1395,7 +1330,6 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT);
this.clientId = ClientId.getClientId();
- this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
@@ -1439,6 +1373,7 @@ public class Client implements AutoCloseable {
// wake up all connections
for (Connection conn : connections.values()) {
conn.interrupt();
+ conn.rpcRequestThread.interrupt();
conn.interruptConnectingThread();
}
@@ -1455,7 +1390,6 @@ public class Client implements AutoCloseable {
}
}
}
- clientExcecutorFactory.unrefAndCleanup();
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 05ef09ae433..b9afa31b448 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -1218,11 +1218,6 @@ public class TestIPC {
@Test(timeout=30000)
public void testInterrupted() {
Client client = new Client(LongWritable.class, conf);
- Client.getClientExecutor().submit(new Runnable() {
- public void run() {
- while(true);
- }
- });
Thread.currentThread().interrupt();
client.stop();
try {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 85b0a7b6c84..362b8e3105e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.test.Whitebox;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
@@ -64,13 +65,16 @@ import org.slf4j.event.Level;
import javax.net.SocketFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
@@ -91,6 +95,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
@@ -995,6 +1000,196 @@ public class TestRPC extends TestRpcBase {
}
}
+ /**
+ * This tests the case where the server isn't receiving new data and
+ * multiple threads queue up to send rpc requests. Only one of the requests
+ * should be written and all of the calling threads should be interrupted.
+ *
+ * We use a mock SocketFactory so that we can control when the input and
+ * output streams are frozen.
+ */
+ @Test(timeout=30000)
+ public void testSlowConnection() throws Exception {
+ SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+ Socket mockSocket = Mockito.mock(Socket.class);
+ Mockito.when(mockFactory.createSocket()).thenReturn(mockSocket);
+ Mockito.when(mockSocket.getPort()).thenReturn(1234);
+ Mockito.when(mockSocket.getLocalPort()).thenReturn(2345);
+ MockOutputStream mockOutputStream = new MockOutputStream();
+ Mockito.when(mockSocket.getOutputStream()).thenReturn(mockOutputStream);
+ // Use an input stream that always blocks
+ Mockito.when(mockSocket.getInputStream()).thenReturn(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ // wait forever
+ while (true) {
+ try {
+ Thread.sleep(TimeUnit.DAYS.toMillis(1));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("test");
+ }
+ }
+ }
+ });
+ Configuration clientConf = new Configuration();
+ // disable ping & timeout to minimize traffic
+ clientConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
+ clientConf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 0);
+ RPC.setProtocolEngine(clientConf, TestRpcService.class, ProtobufRpcEngine.class);
+ // set async mode so that we don't need to implement the input stream
+ final boolean wasAsync = Client.isAsynchronousMode();
+ TestRpcService client = null;
+ try {
+ Client.setAsynchronousMode(true);
+ client = RPC.getProtocolProxy(
+ TestRpcService.class,
+ 0,
+ new InetSocketAddress("localhost", 1234),
+ UserGroupInformation.getCurrentUser(),
+ clientConf,
+ mockFactory).getProxy();
+ // The connection isn't actually made until the first call.
+ client.ping(null, newEmptyRequest());
+ mockOutputStream.waitForFlush(1);
+ final long headerAndFirst = mockOutputStream.getBytesWritten();
+ client.ping(null, newEmptyRequest());
+ mockOutputStream.waitForFlush(2);
+ final long second = mockOutputStream.getBytesWritten() - headerAndFirst;
+ // pause the writer thread
+ mockOutputStream.pause();
+ // create a set of threads to create calls that will back up
+ ExecutorService pool = Executors.newCachedThreadPool();
+ Future[] futures = new Future[numThreads];
+ final AtomicInteger doneThreads = new AtomicInteger(0);
+ for(int thread = 0; thread < numThreads; ++thread) {
+ final TestRpcService finalClient = client;
+ futures[thread] = pool.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ finalClient.ping(null, newEmptyRequest());
+ doneThreads.incrementAndGet();
+ return null;
+ }
+ });
+ }
+ // wait until the threads have started writing
+ mockOutputStream.waitForWriters();
+ // interrupt all the threads
+ for(int thread=0; thread < numThreads; ++thread) {
+ assertTrue("cancel thread " + thread,
+ futures[thread].cancel(true));
+ }
+ // wait until all the writers are cancelled
+ pool.shutdown();
+ pool.awaitTermination(10, TimeUnit.SECONDS);
+ mockOutputStream.resume();
+ // wait for the in flight rpc request to be flushed
+ mockOutputStream.waitForFlush(3);
+ // All the threads should have been interrupted
+ assertEquals(0, doneThreads.get());
+ // make sure that only one additional rpc request was sent
+ assertEquals(headerAndFirst + second * 2,
+ mockOutputStream.getBytesWritten());
+ } finally {
+ Client.setAsynchronousMode(wasAsync);
+ if (client != null) {
+ RPC.stopProxy(client);
+ }
+ }
+ }
+
+ private static final class MockOutputStream extends OutputStream {
+ private long bytesWritten = 0;
+ private AtomicInteger flushCount = new AtomicInteger(0);
+ private ReentrantLock lock = new ReentrantLock(true);
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ lock.lock();
+ bytesWritten += 1;
+ lock.unlock();
+ }
+
+ @Override
+ public void flush() {
+ flushCount.incrementAndGet();
+ }
+
+ public synchronized long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ public void pause() {
+ lock.lock();
+ }
+
+ public void resume() {
+ lock.unlock();
+ }
+
+ private static final int DELAY_MS = 250;
+
+ /**
+ * Wait for the Nth flush, which we assume will happen exactly when the
+ * Nth RPC request is sent.
+ * @param flush the total flush count to wait for
+ * @throws InterruptedException
+ */
+ public void waitForFlush(int flush) throws InterruptedException {
+ while (flushCount.get() < flush) {
+ Thread.sleep(DELAY_MS);
+ }
+ }
+
+ public void waitForWriters() throws InterruptedException {
+ while (!lock.hasQueuedThreads()) {
+ Thread.sleep(DELAY_MS);
+ }
+ }
+ }
+
+ /**
+ * This test causes an exception in the RPC connection setup to make
+ * sure that threads aren't leaked.
+ */
+ @Test(timeout=30000)
+ public void testBadSetup() throws Exception {
+ SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+ Mockito.when(mockFactory.createSocket())
+ .thenThrow(new IOException("can't connect"));
+ Configuration clientConf = new Configuration();
+ // Set an illegal value to cause an exception in the constructor
+ clientConf.set(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
+ "xxx");
+ RPC.setProtocolEngine(clientConf, TestRpcService.class,
+ ProtobufRpcEngine.class);
+ TestRpcService client = null;
+ int threadCount = Thread.getAllStackTraces().size();
+ try {
+ try {
+ client = RPC.getProtocolProxy(
+ TestRpcService.class,
+ 0,
+ new InetSocketAddress("localhost", 1234),
+ UserGroupInformation.getCurrentUser(),
+ clientConf,
+ mockFactory).getProxy();
+ client.ping(null, newEmptyRequest());
+ assertTrue("Didn't throw exception!", false);
+ } catch (ServiceException nfe) {
+ // ensure no extra threads are running.
+ assertEquals(threadCount, Thread.getAllStackTraces().size());
+ } catch (Throwable t) {
+ assertTrue("wrong exception: " + t, false);
+ }
+ } finally {
+ if (client != null) {
+ RPC.stopProxy(client);
+ }
+ }
+ }
+
@Test
public void testConnectionPing() throws Exception {
Server server;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org