You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2013/07/12 23:36:45 UTC
svn commit: r1502696 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt
src/main/java/org/apache/hadoop/ipc/Client.java
src/test/java/org/apache/hadoop/ipc/TestIPC.java
Author: cmccabe
Date: Fri Jul 12 21:36:44 2013
New Revision: 1502696
URL: http://svn.apache.org/r1502696
Log:
HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop (Tsuyoshi OZAWA via Colin Patrick McCabe)
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1502696&r1=1502695&r2=1502696&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jul 12 21:36:44 2013
@@ -305,6 +305,9 @@ Release 2.2.0 - UNRELEASED
HADOOP-9417. Support for symlink resolution in LocalFileSystem /
RawLocalFileSystem. (Andrew Wang via Colin Patrick McCabe)
+ HADOOP-9703. org.apache.hadoop.ipc.Client leaks threads on stop.
+ (Tsuyoshi OZAWA vi Colin Patrick McCabe)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1502696&r1=1502695&r2=1502696&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Fri Jul 12 21:36:44 2013
@@ -118,17 +118,70 @@ public class Client {
private final byte[] clientId;
/**
- * Executor on which IPC calls' parameters are sent. Deferring
- * the sending of parameters to a separate thread isolates them
- * from thread interruptions in the calling code.
- */
- private static final ExecutorService SEND_PARAMS_EXECUTOR =
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("IPC Parameter Sending Thread #%d")
- .build());
-
+ * Executor on which IPC calls' parameters are sent.
+ * Deferring the sending of parameters to a separate
+ * thread isolates them from thread interruptions in the
+ * calling code.
+ */
+ private final ExecutorService sendParamsExecutor;
+ private final static ClientExecutorServiceFactory clientExcecutorFactory =
+ new ClientExecutorServiceFactory();
+
+ private static class ClientExecutorServiceFactory {
+ private int executorRefCount = 0;
+ private ExecutorService clientExecutor = null;
+
+ /**
+ * Get Executor on which IPC calls' parameters are sent.
+ * If the internal reference counter is zero, this method
+ * creates the instance of Executor. If not, this method
+ * just returns the reference of clientExecutor.
+ *
+ * @return An ExecutorService instance
+ */
+ synchronized ExecutorService refAndGetInstance() {
+ if (executorRefCount == 0) {
+ clientExecutor = Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("IPC Parameter Sending Thread #%d")
+ .build());
+ }
+ executorRefCount++;
+
+ return clientExecutor;
+ }
+
+ /**
+ * Cleanup Executor on which IPC calls' parameters are sent.
+ * If reference counter is zero, this method discards the
+ * instance of the Executor. If not, this method
+ * just decrements the internal reference counter.
+ *
+ * @return An ExecutorService instance if it exists.
+ * Null is returned if not.
+ */
+ synchronized ExecutorService unrefAndCleanup() {
+ executorRefCount--;
+ assert(executorRefCount >= 0);
+
+ if (executorRefCount == 0) {
+ clientExecutor.shutdown();
+ try {
+ if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ clientExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for clientExecutor" +
+ "to stop", e);
+ clientExecutor.shutdownNow();
+ }
+ clientExecutor = null;
+ }
+
+ return clientExecutor;
+ }
+ };
/**
* set the ping interval value in configuration
@@ -201,7 +254,7 @@ public class Client {
synchronized boolean isZeroReference() {
return refCount==0;
}
-
+
/**
* Class that represents an RPC call
*/
@@ -879,7 +932,8 @@ public class Client {
}
// Serialize the call to be sent. This is done from the actual
- // caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
+ // caller thread, rather than the sendParamsExecutor thread,
+
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
@@ -896,7 +950,7 @@ public class Client {
call.rpcRequest.write(d);
synchronized (sendRpcRequestLock) {
- Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
+ Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
@@ -1092,6 +1146,7 @@ public class Client {
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = StringUtils.getUuidBytes();
+ this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
}
/**
@@ -1136,6 +1191,8 @@ public class Client {
} catch (InterruptedException e) {
}
}
+
+ clientExcecutorFactory.unrefAndCleanup();
}
/**
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1502696&r1=1502695&r2=1502696&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java Fri Jul 12 21:36:44 2013
@@ -572,6 +572,24 @@ public class TestIPC {
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
}
+
+ @Test(timeout=30000, expected=IOException.class)
+ public void testIpcAfterStopping() throws IOException, InterruptedException {
+ // start server
+ Server server = new TestServer(5, false);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ // start client
+ Client client = new Client(LongWritable.class, conf);
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, MIN_SLEEP_TIME, 0, conf);
+ client.stop();
+
+ // This call should throw IOException.
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, MIN_SLEEP_TIME, 0, conf);
+ }
/**
* Check that file descriptors aren't leaked by starting