You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/11/18 16:52:21 UTC

[hadoop] branch branch-3.3.5 updated: HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new bc4d7b47747 HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)
bc4d7b47747 is described below

commit bc4d7b47747f58d8b0973c18a148be74df3e6c9a
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Fri Nov 18 16:24:45 2022 +0000

    HADOOP-18324. Interrupting RPC Client calls can lead to thread exhaustion. (#4527)
    
    * Exactly 1 sending thread per an RPC connection.
    * If the calling thread is interrupted before the socket write, it will be skipped instead of sending it anyways.
    * If the calling thread is interrupted during the socket write, the write will finish.
    * RPC requests will be written to the socket in the order received.
    * Sending thread is only started by the receiving thread.
    * The sending thread periodically checks the shouldCloseConnection flag.
---
 .../main/java/org/apache/hadoop/ipc/Client.java    | 186 +++++++-------------
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   |   5 -
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   | 195 +++++++++++++++++++++
 3 files changed, 255 insertions(+), 131 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 2e51c63389b..16f799bddcf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -165,73 +165,6 @@ public class Client implements AutoCloseable {
   private final int maxAsyncCalls;
   private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
 
-  /**
-   * Executor on which IPC calls' parameters are sent.
-   * Deferring the sending of parameters to a separate
-   * thread isolates them from thread interruptions in the
-   * calling code.
-   */
-  private final ExecutorService sendParamsExecutor;
-  private final static ClientExecutorServiceFactory clientExcecutorFactory =
-      new ClientExecutorServiceFactory();
-
-  private static class ClientExecutorServiceFactory {
-    private int executorRefCount = 0;
-    private ExecutorService clientExecutor = null;
-    
-    /**
-     * Get Executor on which IPC calls' parameters are sent.
-     * If the internal reference counter is zero, this method
-     * creates the instance of Executor. If not, this method
-     * just returns the reference of clientExecutor.
-     * 
-     * @return An ExecutorService instance
-     */
-    synchronized ExecutorService refAndGetInstance() {
-      if (executorRefCount == 0) {
-        clientExecutor = Executors.newCachedThreadPool(
-            new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat("IPC Parameter Sending Thread #%d")
-            .build());
-      }
-      executorRefCount++;
-      
-      return clientExecutor;
-    }
-    
-    /**
-     * Cleanup Executor on which IPC calls' parameters are sent.
-     * If reference counter is zero, this method discards the
-     * instance of the Executor. If not, this method
-     * just decrements the internal reference counter.
-     * 
-     * @return An ExecutorService instance if it exists.
-     *   Null is returned if not.
-     */
-    synchronized ExecutorService unrefAndCleanup() {
-      executorRefCount--;
-      assert(executorRefCount >= 0);
-      
-      if (executorRefCount == 0) {
-        clientExecutor.shutdown();
-        try {
-          if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
-            clientExecutor.shutdownNow();
-          }
-        } catch (InterruptedException e) {
-          LOG.warn("Interrupted while waiting for clientExecutor" +
-              " to stop");
-          clientExecutor.shutdownNow();
-          Thread.currentThread().interrupt();
-        }
-        clientExecutor = null;
-      }
-      
-      return clientExecutor;
-    }
-  }
-  
   /**
    * set the ping interval value in configuration
    * 
@@ -300,11 +233,6 @@ public class Client implements AutoCloseable {
     conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, timeout);
   }
 
-  @VisibleForTesting
-  public static final ExecutorService getClientExecutor() {
-    return Client.clientExcecutorFactory.clientExecutor;
-  }
-
   /**
    * Increment this client's reference count
    */
@@ -461,8 +389,10 @@ public class Client implements AutoCloseable {
     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
-    
-    private final Object sendRpcRequestLock = new Object();
+
+    private final Thread rpcRequestThread;
+    private final SynchronousQueue<Pair<Call, ResponseBuffer>> rpcRequestQueue =
+        new SynchronousQueue<>(true);
 
     private AtomicReference<Thread> connectingThread = new AtomicReference<>();
     private final Consumer<Connection> removeMethod;
@@ -471,6 +401,9 @@ public class Client implements AutoCloseable {
         Consumer<Connection> removeMethod) {
       this.remoteId = remoteId;
       this.server = remoteId.getAddress();
+      this.rpcRequestThread = new Thread(new RpcRequestSender(),
+          "IPC Parameter Sending Thread for " + remoteId);
+      this.rpcRequestThread.setDaemon(true);
 
       this.maxResponseLength = remoteId.conf.getInt(
           CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
@@ -1149,6 +1082,10 @@ public class Client implements AutoCloseable {
 
     @Override
     public void run() {
+      // Don't start the ipc parameter sending thread until we start this
+      // thread, because the shutdown logic only gets triggered if this
+      // thread is started.
+      rpcRequestThread.start();
       if (LOG.isDebugEnabled())
         LOG.debug(getName() + ": starting, having connections " 
             + connections.size());
@@ -1172,9 +1109,52 @@ public class Client implements AutoCloseable {
             + connections.size());
     }
 
+    /**
+     * A thread to write rpc requests to the socket.
+     */
+    private class RpcRequestSender implements Runnable {
+      @Override
+      public void run() {
+        while (!shouldCloseConnection.get()) {
+          ResponseBuffer buf = null;
+          try {
+            Pair<Call, ResponseBuffer> pair =
+                rpcRequestQueue.poll(maxIdleTime, TimeUnit.MILLISECONDS);
+            if (pair == null || shouldCloseConnection.get()) {
+              continue;
+            }
+            buf = pair.getRight();
+            synchronized (ipcStreams.out) {
+              if (LOG.isDebugEnabled()) {
+                Call call = pair.getLeft();
+                LOG.debug(getName() + "{} sending #{} {}", getName(), call.id,
+                    call.rpcRequest);
+              }
+              // RpcRequestHeader + RpcRequest
+              ipcStreams.sendRequest(buf.toByteArray());
+              ipcStreams.flush();
+            }
+          } catch (InterruptedException ie) {
+            // stop this thread
+            return;
+          } catch (IOException e) {
+            // exception at this point would leave the connection in an
+            // unrecoverable state (eg half a call left on the wire).
+            // So, close the connection, killing any outstanding calls
+            markClosed(e);
+          } finally {
+            //the buffer is just an in-memory buffer, but it is still polite to
+            // close early
+            IOUtils.closeStream(buf);
+          }
+        }
+      }
+    }
+
     /** Initiates a rpc call by sending the rpc request to the remote server.
-     * Note: this is not called from the Connection thread, but by other
-     * threads.
+     * Note: this is not called from the current thread, but by another
+     * thread, so that if the current thread is interrupted that the socket
+     * state isn't corrupted with a partially written message.
      * @param call - the rpc request
      */
     public void sendRpcRequest(final Call call)
@@ -1184,8 +1164,7 @@ public class Client implements AutoCloseable {
       }
 
       // Serialize the call to be sent. This is done from the actual
-      // caller thread, rather than the sendParamsExecutor thread,
-      
+      // caller thread, rather than the rpcRequestThread in the connection,
       // so that if the serialization throws an error, it is reported
       // properly. This also parallelizes the serialization.
       //
@@ -1202,51 +1181,7 @@ public class Client implements AutoCloseable {
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-
-      synchronized (sendRpcRequestLock) {
-        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              synchronized (ipcStreams.out) {
-                if (shouldCloseConnection.get()) {
-                  return;
-                }
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug(getName() + " sending #" + call.id
-                      + " " + call.rpcRequest);
-                }
-                // RpcRequestHeader + RpcRequest
-                ipcStreams.sendRequest(buf.toByteArray());
-                ipcStreams.flush();
-              }
-            } catch (IOException e) {
-              // exception at this point would leave the connection in an
-              // unrecoverable state (eg half a call left on the wire).
-              // So, close the connection, killing any outstanding calls
-              markClosed(e);
-            } finally {
-              //the buffer is just an in-memory buffer, but it is still polite to
-              // close early
-              IOUtils.closeStream(buf);
-            }
-          }
-        });
-      
-        try {
-          senderFuture.get();
-        } catch (ExecutionException e) {
-          Throwable cause = e.getCause();
-          
-          // cause should only be a RuntimeException as the Runnable above
-          // catches IOException
-          if (cause instanceof RuntimeException) {
-            throw (RuntimeException) cause;
-          } else {
-            throw new RuntimeException("unexpected checked exception", cause);
-          }
-        }
-      }
+      rpcRequestQueue.put(Pair.of(call, buf));
     }
 
     /* Receive a response.
@@ -1395,7 +1330,6 @@ public class Client implements AutoCloseable {
             CommonConfigurationKeys.IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT);
 
     this.clientId = ClientId.getClientId();
-    this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
     this.maxAsyncCalls = conf.getInt(
         CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
         CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
@@ -1439,6 +1373,7 @@ public class Client implements AutoCloseable {
     // wake up all connections
     for (Connection conn : connections.values()) {
       conn.interrupt();
+      conn.rpcRequestThread.interrupt();
       conn.interruptConnectingThread();
     }
     
@@ -1455,7 +1390,6 @@ public class Client implements AutoCloseable {
         }
       }
     }
-    clientExcecutorFactory.unrefAndCleanup();
   }
 
   /** 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 05ef09ae433..b9afa31b448 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -1218,11 +1218,6 @@ public class TestIPC {
   @Test(timeout=30000)
   public void testInterrupted() {
     Client client = new Client(LongWritable.class, conf);
-    Client.getClientExecutor().submit(new Runnable() {
-      public void run() {
-        while(true);
-      }
-    });
     Thread.currentThread().interrupt();
     client.stop();
     try {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 85b0a7b6c84..362b8e3105e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.test.Whitebox;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.event.Level;
@@ -64,13 +65,16 @@ import org.slf4j.event.Level;
 import javax.net.SocketFactory;
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
@@ -91,6 +95,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
@@ -995,6 +1000,196 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  /**
+   * This tests the case where the server isn't receiving new data and
+   * multiple threads queue up to send rpc requests. Only one of the requests
+   * should be written and all of the calling threads should be interrupted.
+   *
+   * We use a mock SocketFactory so that we can control when the input and
+   * output streams are frozen.
+   */
+  @Test(timeout=30000)
+  public void testSlowConnection() throws Exception {
+    SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+    Socket mockSocket = Mockito.mock(Socket.class);
+    Mockito.when(mockFactory.createSocket()).thenReturn(mockSocket);
+    Mockito.when(mockSocket.getPort()).thenReturn(1234);
+    Mockito.when(mockSocket.getLocalPort()).thenReturn(2345);
+    MockOutputStream mockOutputStream = new MockOutputStream();
+    Mockito.when(mockSocket.getOutputStream()).thenReturn(mockOutputStream);
+    // Use an input stream that always blocks
+    Mockito.when(mockSocket.getInputStream()).thenReturn(new InputStream() {
+      @Override
+      public int read() throws IOException {
+        // wait forever
+        while (true) {
+          try {
+            Thread.sleep(TimeUnit.DAYS.toMillis(1));
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException("test");
+          }
+        }
+      }
+    });
+    Configuration clientConf = new Configuration();
+    // disable ping & timeout to minimize traffic
+    clientConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
+    clientConf.setInt(CommonConfigurationKeys.IPC_CLIENT_RPC_TIMEOUT_KEY, 0);
+    RPC.setProtocolEngine(clientConf, TestRpcService.class, ProtobufRpcEngine.class);
+    // set async mode so that we don't need to implement the input stream
+    final boolean wasAsync = Client.isAsynchronousMode();
+    TestRpcService client = null;
+    try {
+      Client.setAsynchronousMode(true);
+      client = RPC.getProtocolProxy(
+          TestRpcService.class,
+          0,
+          new InetSocketAddress("localhost", 1234),
+          UserGroupInformation.getCurrentUser(),
+          clientConf,
+          mockFactory).getProxy();
+      // The connection isn't actually made until the first call.
+      client.ping(null, newEmptyRequest());
+      mockOutputStream.waitForFlush(1);
+      final long headerAndFirst = mockOutputStream.getBytesWritten();
+      client.ping(null, newEmptyRequest());
+      mockOutputStream.waitForFlush(2);
+      final long second = mockOutputStream.getBytesWritten() - headerAndFirst;
+      // pause the writer thread
+      mockOutputStream.pause();
+      // create a set of threads to create calls that will back up
+      ExecutorService pool = Executors.newCachedThreadPool();
+      Future[] futures = new Future[numThreads];
+      final AtomicInteger doneThreads = new AtomicInteger(0);
+      for(int thread = 0; thread < numThreads; ++thread) {
+        final TestRpcService finalClient = client;
+        futures[thread] = pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            finalClient.ping(null, newEmptyRequest());
+            doneThreads.incrementAndGet();
+            return null;
+          }
+        });
+      }
+      // wait until the threads have started writing
+      mockOutputStream.waitForWriters();
+      // interrupt all the threads
+      for(int thread=0; thread < numThreads; ++thread) {
+        assertTrue("cancel thread " + thread,
+            futures[thread].cancel(true));
+      }
+      // wait until all the writers are cancelled
+      pool.shutdown();
+      pool.awaitTermination(10, TimeUnit.SECONDS);
+      mockOutputStream.resume();
+      // wait for the in flight rpc request to be flushed
+      mockOutputStream.waitForFlush(3);
+      // All the threads should have been interrupted
+      assertEquals(0, doneThreads.get());
+      // make sure that only one additional rpc request was sent
+      assertEquals(headerAndFirst + second * 2,
+          mockOutputStream.getBytesWritten());
+    } finally {
+      Client.setAsynchronousMode(wasAsync);
+      if (client != null) {
+        RPC.stopProxy(client);
+      }
+    }
+  }
+
+  private static final class MockOutputStream extends OutputStream {
+    private long bytesWritten = 0;
+    private AtomicInteger flushCount = new AtomicInteger(0);
+    private ReentrantLock lock = new ReentrantLock(true);
+
+    @Override
+    public synchronized void write(int b) throws IOException {
+      lock.lock();
+      bytesWritten += 1;
+      lock.unlock();
+    }
+
+    @Override
+    public void flush() {
+      flushCount.incrementAndGet();
+    }
+
+    public synchronized long getBytesWritten() {
+      return bytesWritten;
+    }
+
+    public void pause() {
+      lock.lock();
+    }
+
+    public void resume() {
+      lock.unlock();
+    }
+
+    private static final int DELAY_MS = 250;
+
+    /**
+     * Wait for the Nth flush, which we assume will happen exactly when the
+     * Nth RPC request is sent.
+     * @param flush the total flush count to wait for
+     * @throws InterruptedException
+     */
+    public void waitForFlush(int flush) throws InterruptedException {
+      while (flushCount.get() < flush) {
+        Thread.sleep(DELAY_MS);
+      }
+    }
+
+    public void waitForWriters() throws InterruptedException {
+      while (!lock.hasQueuedThreads()) {
+        Thread.sleep(DELAY_MS);
+      }
+    }
+  }
+
+  /**
+   * This test causes an exception in the RPC connection setup to make
+   * sure that threads aren't leaked.
+   */
+  @Test(timeout=30000)
+  public void testBadSetup() throws Exception {
+    SocketFactory mockFactory = Mockito.mock(SocketFactory.class);
+    Mockito.when(mockFactory.createSocket())
+        .thenThrow(new IOException("can't connect"));
+    Configuration clientConf = new Configuration();
+    // Set an illegal value to cause an exception in the constructor
+    clientConf.set(CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH,
+        "xxx");
+    RPC.setProtocolEngine(clientConf, TestRpcService.class,
+        ProtobufRpcEngine.class);
+    TestRpcService client = null;
+    int threadCount = Thread.getAllStackTraces().size();
+    try {
+      try {
+        client = RPC.getProtocolProxy(
+            TestRpcService.class,
+            0,
+            new InetSocketAddress("localhost", 1234),
+            UserGroupInformation.getCurrentUser(),
+            clientConf,
+            mockFactory).getProxy();
+        client.ping(null, newEmptyRequest());
+        assertTrue("Didn't throw exception!", false);
+      } catch (ServiceException nfe) {
+        // ensure no extra threads are running.
+        assertEquals(threadCount, Thread.getAllStackTraces().size());
+      } catch (Throwable t) {
+        assertTrue("wrong exception: " + t, false);
+      }
+    } finally {
+      if (client != null) {
+        RPC.stopProxy(client);
+      }
+    }
+  }
+
   @Test
   public void testConnectionPing() throws Exception {
     Server server;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org