You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/09/16 01:47:55 UTC
[2/2] incubator-geode git commit: async close now done with thread
pool
async close now done with thread pool
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6d6c760c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6d6c760c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6d6c760c
Branch: refs/heads/feature/GEODE-332
Commit: 6d6c760ccb17d4bd0d683072f1d79dc895926c27
Parents: fa01768
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:47:17 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Sep 15 16:47:17 2015 -0700
----------------------------------------------------------------------
.../gemfire/internal/SocketCreator.java | 69 +++++++++++++-------
.../gemfire/internal/tcp/ConnectionTable.java | 1 +
2 files changed, 48 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index ff4a22c..0688c3d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -81,6 +81,11 @@ import com.gemstone.gemfire.internal.util.PasswordUtil;
import com.gemstone.org.jgroups.util.ConnectionWatcher;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.*;
@@ -1197,6 +1202,40 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator
return (String[]) v.toArray( new String[ v.size() ] );
}
+ /** thread pool of async close threads */
+ private static ThreadPoolExecutor asyncCloseExecutor;
+ /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
+ private final static long ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME", 120).longValue();
+
+ private static synchronized ThreadPoolExecutor getAsyncThreadExecutor() {
+ ThreadPoolExecutor pool = asyncCloseExecutor;
+ if (pool == null) {
+ final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+ ThreadFactory tf = new ThreadFactory() {
+ public Thread newThread(final Runnable command) {
+ Thread thread = new Thread(tg, command);
+ thread.setDaemon(true);
+ return thread;
+ }
+ };
+ BlockingQueue synchronousQueue = new SynchronousQueue();
+ pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, synchronousQueue, tf);
+ asyncCloseExecutor = pool;
+ }
+ return pool;
+ }
+ public static synchronized void closeAsyncThreadExecutor() {
+ ThreadPoolExecutor pool = asyncCloseExecutor;
+ if (pool != null) {
+ pool.shutdownNow();
+ asyncCloseExecutor = null;
+ }
+ }
+ private static synchronized void asyncExecute(Runnable r) {
+ // The old code waited 50ms for the async task to complete.
+ // Should this code use submit on the executor and also wait 50ms?
+ getAsyncThreadExecutor().execute(r);
+ }
/**
* Closes the specified socket in a background thread and waits a limited
* amount of time for the close to complete. In some cases we see close
@@ -1206,44 +1245,30 @@ public class SocketCreator implements com.gemstone.org.jgroups.util.SockCreator
* @param who who the socket is connected to
* @param extra an optional Runnable with stuff to execute in the async thread
*/
- public static void asyncClose(final Socket sock, String who, final Runnable extra) {
+ public static void asyncClose(final Socket sock, final String who, final Runnable extra) {
if (sock == null || sock.isClosed()) {
return;
}
try {
- ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-
- Thread t = new Thread(tg, new Runnable() {
+ asyncExecute(new Runnable() {
public void run() {
+ Thread.currentThread().setName("AsyncSocketCloser for " + who);
+ try {
if (extra != null) {
extra.run();
}
inlineClose(sock);
+ } finally {
+ Thread.currentThread().setName("unused AsyncSocketCloser");
+ }
}
- }, "AsyncSocketCloser for " + who);
- t.setDaemon(true);
- try {
- t.start();
+ });
} catch (OutOfMemoryError ignore) {
// If we can't start a thread to close the socket just do it inline.
// See bug 50573.
inlineClose(sock);
return;
}
- try {
- // [bruce] if the network fails, this will wait the full amount of time
- // on every close, so it must be kept very short. it was 750ms before,
- // causing frequent hangs in net-down hydra tests
- t.join(50/*ms*/);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- // NOTREACHED
- throw e;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 525c687..84ea1eb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -776,6 +776,7 @@ public class ConnectionTable {
m.clear();
}
}
+ SocketCreator.closeAsyncThreadExecutor();
}
public void executeCommand(Runnable runnable) {